Statistics
| Branch: | Tag: | Revision:

root / src / Ganeti / JQueue.hs @ cef3f99f

History | View | Annotate | Download (12.2 kB)

1
{-# LANGUAGE TemplateHaskell #-}
2

    
3
{-| Implementation of the job queue.
4

    
5
-}
6

    
7
{-
8

    
9
Copyright (C) 2010, 2012 Google Inc.
10

    
11
This program is free software; you can redistribute it and/or modify
12
it under the terms of the GNU General Public License as published by
13
the Free Software Foundation; either version 2 of the License, or
14
(at your option) any later version.
15

    
16
This program is distributed in the hope that it will be useful, but
17
WITHOUT ANY WARRANTY; without even the implied warranty of
18
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19
General Public License for more details.
20

    
21
You should have received a copy of the GNU General Public License
22
along with this program; if not, write to the Free Software
23
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24
02110-1301, USA.
25

    
26
-}
27

    
28
module Ganeti.JQueue
29
    ( QueuedOpCode(..)
30
    , QueuedJob(..)
31
    , InputOpCode(..)
32
    , queuedOpCodeFromMetaOpCode
33
    , queuedJobFromOpCodes
34
    , Timestamp
35
    , noTimestamp
36
    , opStatusFinalized
37
    , extractOpSummary
38
    , calcJobStatus
39
    , calcJobPriority
40
    , jobFileName
41
    , liveJobFile
42
    , archivedJobFile
43
    , determineJobDirectories
44
    , getJobIDs
45
    , sortJobIDs
46
    , loadJobFromDisk
47
    , noSuchJob
48
    , readSerialFromDisk
49
    ) where
50

    
51
import Control.Exception
52
import Control.Monad
53
import Data.List
54
import Data.Maybe
55
import Data.Ord (comparing)
56
-- workaround what seems to be a bug in ghc 7.4's TH shadowing code
57
import Prelude hiding (log, id)
58
import System.Directory
59
import System.FilePath
60
import System.IO.Error (isDoesNotExistError)
61
import System.Posix.Files
62
import qualified Text.JSON
63
import Text.JSON.Types
64

    
65
import Ganeti.BasicTypes
66
import qualified Ganeti.Constants as C
67
import Ganeti.JSON
68
import Ganeti.Logging
69
import Ganeti.OpCodes
70
import Ganeti.Path
71
import Ganeti.THH
72
import Ganeti.Types
73
import Ganeti.Utils
74

    
75
-- * Data types
76

    
77
-- | The ganeti queue timestamp type
78
type Timestamp = (Int, Int)
79

    
80
-- | Missing timestamp type.
81
noTimestamp :: Timestamp
82
noTimestamp = (-1, -1)
83

    
84
-- | An input opcode.
85
data InputOpCode = ValidOpCode MetaOpCode -- ^ OpCode was parsed successfully
86
                 | InvalidOpCode JSValue  -- ^ Invalid opcode
87
                   deriving (Show, Eq)
88

    
89
-- | JSON instance for 'InputOpCode', trying to parse it and if
90
-- failing, keeping the original JSValue.
91
instance Text.JSON.JSON InputOpCode where
92
  showJSON (ValidOpCode mo) = Text.JSON.showJSON mo
93
  showJSON (InvalidOpCode inv) = inv
94
  readJSON v = case Text.JSON.readJSON v of
95
                 Text.JSON.Error _ -> return $ InvalidOpCode v
96
                 Text.JSON.Ok mo -> return $ ValidOpCode mo
97

    
98
-- | Invalid opcode summary.
99
invalidOp :: String
100
invalidOp = "INVALID_OP"
101

    
102
-- | Tries to extract the opcode summary from an 'InputOpCode'. This
103
-- duplicates some functionality from the 'opSummary' function in
104
-- "Ganeti.OpCodes".
105
extractOpSummary :: InputOpCode -> String
106
extractOpSummary (ValidOpCode metaop) = opSummary $ metaOpCode metaop
107
extractOpSummary (InvalidOpCode (JSObject o)) =
108
  case fromObjWithDefault (fromJSObject o) "OP_ID" ("OP_" ++ invalidOp) of
109
    Just s -> drop 3 s -- drop the OP_ prefix
110
    Nothing -> invalidOp
111
extractOpSummary _ = invalidOp
112

    
113
$(buildObject "QueuedOpCode" "qo"
114
  [ simpleField "input"           [t| InputOpCode |]
115
  , simpleField "status"          [t| OpStatus    |]
116
  , simpleField "result"          [t| JSValue     |]
117
  , defaultField [| [] |] $
118
    simpleField "log"             [t| [(Int, Timestamp, ELogType, JSValue)] |]
119
  , simpleField "priority"        [t| Int         |]
120
  , optionalNullSerField $
121
    simpleField "start_timestamp" [t| Timestamp   |]
122
  , optionalNullSerField $
123
    simpleField "exec_timestamp"  [t| Timestamp   |]
124
  , optionalNullSerField $
125
    simpleField "end_timestamp"   [t| Timestamp   |]
126
  ])
127

    
128
$(buildObject "QueuedJob" "qj"
129
  [ simpleField "id"                 [t| JobId          |]
130
  , simpleField "ops"                [t| [QueuedOpCode] |]
131
  , optionalNullSerField $
132
    simpleField "received_timestamp" [t| Timestamp      |]
133
  , optionalNullSerField $
134
    simpleField "start_timestamp"    [t| Timestamp      |]
135
  , optionalNullSerField $
136
    simpleField "end_timestamp"      [t| Timestamp      |]
137
  ])
138

    
139
-- | Convenience function to obtain a QueuedOpCode from a MetaOpCode
140
queuedOpCodeFromMetaOpCode :: MetaOpCode -> QueuedOpCode
141
queuedOpCodeFromMetaOpCode op =
142
  QueuedOpCode { qoInput = ValidOpCode op
143
               , qoStatus = OP_STATUS_QUEUED
144
               , qoPriority = opSubmitPriorityToRaw . opPriority . metaParams
145
                              $ op
146
               , qoLog = []
147
               , qoResult = JSNull
148
               , qoStartTimestamp = Nothing
149
               , qoEndTimestamp = Nothing
150
               , qoExecTimestamp = Nothing
151
               }
152

    
153
-- | From a job-id and a list of op-codes create a job. This is
154
-- the pure part of job creation, as allocating a new job id
155
-- lives in IO.
156
queuedJobFromOpCodes :: (Monad m) => JobId -> [MetaOpCode] -> m QueuedJob
157
queuedJobFromOpCodes jobid ops = do
158
  ops' <- mapM (`resolveDependencies` jobid) ops
159
  return QueuedJob { qjId = jobid
160
                   , qjOps = map queuedOpCodeFromMetaOpCode ops'
161
                   , qjReceivedTimestamp = Nothing 
162
                   , qjStartTimestamp = Nothing
163
                   , qjEndTimestamp = Nothing
164
                   }
165

    
166
-- | Job file prefix.
167
jobFilePrefix :: String
168
jobFilePrefix = "job-"
169

    
170
-- | Computes the filename for a given job ID.
171
jobFileName :: JobId -> FilePath
172
jobFileName jid = jobFilePrefix ++ show (fromJobId jid)
173

    
174
-- | Parses a job ID from a file name.
175
parseJobFileId :: (Monad m) => FilePath -> m JobId
176
parseJobFileId path =
177
  case stripPrefix jobFilePrefix path of
178
    Nothing -> fail $ "Job file '" ++ path ++
179
                      "' doesn't have the correct prefix"
180
    Just suffix -> makeJobIdS suffix
181

    
182
-- | Computes the full path to a live job.
183
liveJobFile :: FilePath -> JobId -> FilePath
184
liveJobFile rootdir jid = rootdir </> jobFileName jid
185

    
186
-- | Computes the full path to an archives job. BROKEN.
187
archivedJobFile :: FilePath -> JobId -> FilePath
188
archivedJobFile rootdir jid =
189
  let subdir = show (fromJobId jid `div` C.jstoreJobsPerArchiveDirectory)
190
  in rootdir </> jobQueueArchiveSubDir </> subdir </> jobFileName jid
191

    
192
-- | Map from opcode status to job status.
193
opStatusToJob :: OpStatus -> JobStatus
194
opStatusToJob OP_STATUS_QUEUED    = JOB_STATUS_QUEUED
195
opStatusToJob OP_STATUS_WAITING   = JOB_STATUS_WAITING
196
opStatusToJob OP_STATUS_SUCCESS   = JOB_STATUS_SUCCESS
197
opStatusToJob OP_STATUS_RUNNING   = JOB_STATUS_RUNNING
198
opStatusToJob OP_STATUS_CANCELING = JOB_STATUS_CANCELING
199
opStatusToJob OP_STATUS_CANCELED  = JOB_STATUS_CANCELED
200
opStatusToJob OP_STATUS_ERROR     = JOB_STATUS_ERROR
201

    
202
-- | Computes a queued job's status.
203
calcJobStatus :: QueuedJob -> JobStatus
204
calcJobStatus QueuedJob { qjOps = ops } =
205
  extractOpSt (map qoStatus ops) JOB_STATUS_QUEUED True
206
    where
207
      terminalStatus OP_STATUS_ERROR     = True
208
      terminalStatus OP_STATUS_CANCELING = True
209
      terminalStatus OP_STATUS_CANCELED  = True
210
      terminalStatus _                   = False
211
      softStatus     OP_STATUS_SUCCESS   = True
212
      softStatus     OP_STATUS_QUEUED    = True
213
      softStatus     _                   = False
214
      extractOpSt [] _ True = JOB_STATUS_SUCCESS
215
      extractOpSt [] d False = d
216
      extractOpSt (x:xs) d old_all
217
           | terminalStatus x = opStatusToJob x -- abort recursion
218
           | softStatus x     = extractOpSt xs d new_all -- continue unchanged
219
           | otherwise        = extractOpSt xs (opStatusToJob x) new_all
220
           where new_all = x == OP_STATUS_SUCCESS && old_all
221

    
222
-- | Determine whether an opcode status is finalized.
223
opStatusFinalized :: OpStatus -> Bool
224
opStatusFinalized = (> OP_STATUS_RUNNING)
225

    
226
-- | Compute a job's priority.
227
calcJobPriority :: QueuedJob -> Int
228
calcJobPriority QueuedJob { qjOps = ops } =
229
  helper . map qoPriority $ filter (not . opStatusFinalized . qoStatus) ops
230
    where helper [] = C.opPrioDefault
231
          helper ps = minimum ps
232

    
233
-- | Log but ignore an 'IOError'.
234
ignoreIOError :: a -> Bool -> String -> IOError -> IO a
235
ignoreIOError a ignore_noent msg e = do
236
  unless (isDoesNotExistError e && ignore_noent) .
237
    logWarning $ msg ++ ": " ++ show e
238
  return a
239

    
240
-- | Compute the list of existing archive directories. Note that I/O
241
-- exceptions are swallowed and ignored.
242
allArchiveDirs :: FilePath -> IO [FilePath]
243
allArchiveDirs rootdir = do
244
  let adir = rootdir </> jobQueueArchiveSubDir
245
  contents <- getDirectoryContents adir `Control.Exception.catch`
246
               ignoreIOError [] False
247
                 ("Failed to list queue directory " ++ adir)
248
  let fpaths = map (adir </>) $ filter (not . ("." `isPrefixOf`)) contents
249
  filterM (\path ->
250
             liftM isDirectory (getFileStatus (adir </> path))
251
               `Control.Exception.catch`
252
               ignoreIOError False True
253
                 ("Failed to stat archive path " ++ path)) fpaths
254

    
255
-- | Build list of directories containing job files. Note: compared to
256
-- the Python version, this doesn't ignore a potential lost+found
257
-- file.
258
determineJobDirectories :: FilePath -> Bool -> IO [FilePath]
259
determineJobDirectories rootdir archived = do
260
  other <- if archived
261
             then allArchiveDirs rootdir
262
             else return []
263
  return $ rootdir:other
264

    
265
-- Function equivalent to the \'sequence\' function, that cannot be used because
266
-- of library version conflict on Lucid.
267
-- FIXME: delete this and just use \'sequence\' instead when Lucid compatibility
268
-- will not be required anymore.
269
sequencer :: [Either IOError [JobId]] -> Either IOError [[JobId]]
270
sequencer l = fmap reverse $ foldl seqFolder (Right []) l
271

    
272
-- | Folding function for joining multiple [JobIds] into one list.
273
seqFolder :: Either IOError [[JobId]]
274
          -> Either IOError [JobId]
275
          -> Either IOError [[JobId]]
276
seqFolder (Left e) _ = Left e
277
seqFolder (Right _) (Left e) = Left e
278
seqFolder (Right l) (Right el) = Right $ el:l
279

    
280
-- | Computes the list of all jobs in the given directories.
281
getJobIDs :: [FilePath] -> IO (Either IOError [JobId])
282
getJobIDs paths = liftM (fmap concat . sequencer) (mapM getDirJobIDs paths)
283

    
284
-- | Sorts the a list of job IDs.
285
sortJobIDs :: [JobId] -> [JobId]
286
sortJobIDs = sortBy (comparing fromJobId)
287

    
288
-- | Computes the list of jobs in a given directory.
289
getDirJobIDs :: FilePath -> IO (Either IOError [JobId])
290
getDirJobIDs path = do
291
  either_contents <-
292
    try (getDirectoryContents path) :: IO (Either IOError [FilePath])
293
  case either_contents of
294
    Left e -> do
295
      logWarning $ "Failed to list job directory " ++ path ++ ": " ++ show e
296
      return $ Left e
297
    Right contents -> do
298
      let jids = foldl (\ids file ->
299
                         case parseJobFileId file of
300
                           Nothing -> ids
301
                           Just new_id -> new_id:ids) [] contents
302
      return . Right $ reverse jids
303

    
304
-- | Reads the job data from disk.
305
readJobDataFromDisk :: FilePath -> Bool -> JobId -> IO (Maybe (String, Bool))
306
readJobDataFromDisk rootdir archived jid = do
307
  let live_path = liveJobFile rootdir jid
308
      archived_path = archivedJobFile rootdir jid
309
      all_paths = if archived
310
                    then [(live_path, False), (archived_path, True)]
311
                    else [(live_path, False)]
312
  foldM (\state (path, isarchived) ->
313
           liftM (\r -> Just (r, isarchived)) (readFile path)
314
             `Control.Exception.catch`
315
             ignoreIOError state True
316
               ("Failed to read job file " ++ path)) Nothing all_paths
317

    
318
-- | Failed to load job error.
319
noSuchJob :: Result (QueuedJob, Bool)
320
noSuchJob = Bad "Can't load job file"
321

    
322
-- | Loads a job from disk.
323
loadJobFromDisk :: FilePath -> Bool -> JobId -> IO (Result (QueuedJob, Bool))
324
loadJobFromDisk rootdir archived jid = do
325
  raw <- readJobDataFromDisk rootdir archived jid
326
  -- note: we need some stricness below, otherwise the wrapping in a
327
  -- Result will create too much lazyness, and not close the file
328
  -- descriptors for the individual jobs
329
  return $! case raw of
330
             Nothing -> noSuchJob
331
             Just (str, arch) ->
332
               liftM (\qj -> (qj, arch)) .
333
               fromJResult "Parsing job file" $ Text.JSON.decode str
334

    
335
-- | Read the job serial number from disk.
336
readSerialFromDisk :: IO (Result JobId)
337
readSerialFromDisk = do
338
  filename <- jobQueueSerialFile
339
  tryAndLogIOError (readFile filename) "Failed to read serial file"
340
                   (makeJobIdS . rStripSpace)