Statistics
| Branch: | Tag: | Revision:

root / src / Ganeti / JQueue.hs @ 4b49a72b

History | View | Annotate | Download (11.3 kB)

1
{-# LANGUAGE TemplateHaskell #-}
2

    
3
{-| Implementation of the job queue.
4

    
5
-}
6

    
7
{-
8

    
9
Copyright (C) 2010, 2012 Google Inc.
10

    
11
This program is free software; you can redistribute it and/or modify
12
it under the terms of the GNU General Public License as published by
13
the Free Software Foundation; either version 2 of the License, or
14
(at your option) any later version.
15

    
16
This program is distributed in the hope that it will be useful, but
17
WITHOUT ANY WARRANTY; without even the implied warranty of
18
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19
General Public License for more details.
20

    
21
You should have received a copy of the GNU General Public License
22
along with this program; if not, write to the Free Software
23
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24
02110-1301, USA.
25

    
26
-}
27

    
28
module Ganeti.JQueue
29
    ( QueuedOpCode(..)
30
    , QueuedJob(..)
31
    , InputOpCode(..)
32
    , queuedOpCodeFromMetaOpCode
33
    , Timestamp
34
    , noTimestamp
35
    , opStatusFinalized
36
    , extractOpSummary
37
    , calcJobStatus
38
    , calcJobPriority
39
    , jobFileName
40
    , liveJobFile
41
    , archivedJobFile
42
    , determineJobDirectories
43
    , getJobIDs
44
    , sortJobIDs
45
    , loadJobFromDisk
46
    , noSuchJob
47
    ) where
48

    
49
import Control.Exception
50
import Control.Monad
51
import Data.List
52
import Data.Maybe
53
import Data.Ord (comparing)
54
-- workaround what seems to be a bug in ghc 7.4's TH shadowing code
55
import Prelude hiding (log, id)
56
import System.Directory
57
import System.FilePath
58
import System.IO.Error (isDoesNotExistError)
59
import System.Posix.Files
60
import qualified Text.JSON
61
import Text.JSON.Types
62

    
63
import Ganeti.BasicTypes
64
import qualified Ganeti.Constants as C
65
import Ganeti.JSON
66
import Ganeti.Logging
67
import Ganeti.OpCodes
68
import Ganeti.Path
69
import Ganeti.THH
70
import Ganeti.Types
71

    
72
-- * Data types
73

    
74
-- | The ganeti queue timestamp type
75
type Timestamp = (Int, Int)
76

    
77
-- | Missing timestamp type.
78
noTimestamp :: Timestamp
79
noTimestamp = (-1, -1)
80

    
81
-- | An input opcode.
82
data InputOpCode = ValidOpCode MetaOpCode -- ^ OpCode was parsed successfully
83
                 | InvalidOpCode JSValue  -- ^ Invalid opcode
84
                   deriving (Show, Eq)
85

    
86
-- | JSON instance for 'InputOpCode', trying to parse it and if
87
-- failing, keeping the original JSValue.
88
instance Text.JSON.JSON InputOpCode where
89
  showJSON (ValidOpCode mo) = Text.JSON.showJSON mo
90
  showJSON (InvalidOpCode inv) = inv
91
  readJSON v = case Text.JSON.readJSON v of
92
                 Text.JSON.Error _ -> return $ InvalidOpCode v
93
                 Text.JSON.Ok mo -> return $ ValidOpCode mo
94

    
95
-- | Invalid opcode summary.
96
invalidOp :: String
97
invalidOp = "INVALID_OP"
98

    
99
-- | Tries to extract the opcode summary from an 'InputOpCode'. This
100
-- duplicates some functionality from the 'opSummary' function in
101
-- "Ganeti.OpCodes".
102
extractOpSummary :: InputOpCode -> String
103
extractOpSummary (ValidOpCode metaop) = opSummary $ metaOpCode metaop
104
extractOpSummary (InvalidOpCode (JSObject o)) =
105
  case fromObjWithDefault (fromJSObject o) "OP_ID" ("OP_" ++ invalidOp) of
106
    Just s -> drop 3 s -- drop the OP_ prefix
107
    Nothing -> invalidOp
108
extractOpSummary _ = invalidOp
109

    
110
$(buildObject "QueuedOpCode" "qo"
111
  [ simpleField "input"           [t| InputOpCode |]
112
  , simpleField "status"          [t| OpStatus    |]
113
  , simpleField "result"          [t| JSValue     |]
114
  , defaultField [| [] |] $
115
    simpleField "log"             [t| [(Int, Timestamp, ELogType, JSValue)] |]
116
  , simpleField "priority"        [t| Int         |]
117
  , optionalNullSerField $
118
    simpleField "start_timestamp" [t| Timestamp   |]
119
  , optionalNullSerField $
120
    simpleField "exec_timestamp"  [t| Timestamp   |]
121
  , optionalNullSerField $
122
    simpleField "end_timestamp"   [t| Timestamp   |]
123
  ])
124

    
125
$(buildObject "QueuedJob" "qj"
126
  [ simpleField "id"                 [t| JobId          |]
127
  , simpleField "ops"                [t| [QueuedOpCode] |]
128
  , optionalNullSerField $
129
    simpleField "received_timestamp" [t| Timestamp      |]
130
  , optionalNullSerField $
131
    simpleField "start_timestamp"    [t| Timestamp      |]
132
  , optionalNullSerField $
133
    simpleField "end_timestamp"      [t| Timestamp      |]
134
  ])
135

    
136
-- | Convenience function to obtain a QueuedOpCode from a MetaOpCode
137
queuedOpCodeFromMetaOpCode :: MetaOpCode -> QueuedOpCode
138
queuedOpCodeFromMetaOpCode op =
139
  QueuedOpCode { qoInput = ValidOpCode op
140
               , qoStatus = OP_STATUS_QUEUED
141
               , qoPriority = opSubmitPriorityToRaw . opPriority . metaParams
142
                              $ op
143
               , qoLog = []
144
               , qoResult = JSNull
145
               , qoStartTimestamp = Nothing
146
               , qoEndTimestamp = Nothing
147
               , qoExecTimestamp = Nothing
148
               }
149

    
150
-- | Job file prefix.
151
jobFilePrefix :: String
152
jobFilePrefix = "job-"
153

    
154
-- | Computes the filename for a given job ID.
155
jobFileName :: JobId -> FilePath
156
jobFileName jid = jobFilePrefix ++ show (fromJobId jid)
157

    
158
-- | Parses a job ID from a file name.
159
parseJobFileId :: (Monad m) => FilePath -> m JobId
160
parseJobFileId path =
161
  case stripPrefix jobFilePrefix path of
162
    Nothing -> fail $ "Job file '" ++ path ++
163
                      "' doesn't have the correct prefix"
164
    Just suffix -> makeJobIdS suffix
165

    
166
-- | Computes the full path to a live job.
167
liveJobFile :: FilePath -> JobId -> FilePath
168
liveJobFile rootdir jid = rootdir </> jobFileName jid
169

    
170
-- | Computes the full path to an archives job. BROKEN.
171
archivedJobFile :: FilePath -> JobId -> FilePath
172
archivedJobFile rootdir jid =
173
  let subdir = show (fromJobId jid `div` C.jstoreJobsPerArchiveDirectory)
174
  in rootdir </> jobQueueArchiveSubDir </> subdir </> jobFileName jid
175

    
176
-- | Map from opcode status to job status.
177
opStatusToJob :: OpStatus -> JobStatus
178
opStatusToJob OP_STATUS_QUEUED    = JOB_STATUS_QUEUED
179
opStatusToJob OP_STATUS_WAITING   = JOB_STATUS_WAITING
180
opStatusToJob OP_STATUS_SUCCESS   = JOB_STATUS_SUCCESS
181
opStatusToJob OP_STATUS_RUNNING   = JOB_STATUS_RUNNING
182
opStatusToJob OP_STATUS_CANCELING = JOB_STATUS_CANCELING
183
opStatusToJob OP_STATUS_CANCELED  = JOB_STATUS_CANCELED
184
opStatusToJob OP_STATUS_ERROR     = JOB_STATUS_ERROR
185

    
186
-- | Computes a queued job's status.
187
calcJobStatus :: QueuedJob -> JobStatus
188
calcJobStatus QueuedJob { qjOps = ops } =
189
  extractOpSt (map qoStatus ops) JOB_STATUS_QUEUED True
190
    where
191
      terminalStatus OP_STATUS_ERROR     = True
192
      terminalStatus OP_STATUS_CANCELING = True
193
      terminalStatus OP_STATUS_CANCELED  = True
194
      terminalStatus _                   = False
195
      softStatus     OP_STATUS_SUCCESS   = True
196
      softStatus     OP_STATUS_QUEUED    = True
197
      softStatus     _                   = False
198
      extractOpSt [] _ True = JOB_STATUS_SUCCESS
199
      extractOpSt [] d False = d
200
      extractOpSt (x:xs) d old_all
201
           | terminalStatus x = opStatusToJob x -- abort recursion
202
           | softStatus x     = extractOpSt xs d new_all -- continue unchanged
203
           | otherwise        = extractOpSt xs (opStatusToJob x) new_all
204
           where new_all = x == OP_STATUS_SUCCESS && old_all
205

    
206
-- | Determine whether an opcode status is finalized.
207
opStatusFinalized :: OpStatus -> Bool
208
opStatusFinalized = (> OP_STATUS_RUNNING)
209

    
210
-- | Compute a job's priority.
211
calcJobPriority :: QueuedJob -> Int
212
calcJobPriority QueuedJob { qjOps = ops } =
213
  helper . map qoPriority $ filter (not . opStatusFinalized . qoStatus) ops
214
    where helper [] = C.opPrioDefault
215
          helper ps = minimum ps
216

    
217
-- | Log but ignore an 'IOError'.
218
ignoreIOError :: a -> Bool -> String -> IOError -> IO a
219
ignoreIOError a ignore_noent msg e = do
220
  unless (isDoesNotExistError e && ignore_noent) .
221
    logWarning $ msg ++ ": " ++ show e
222
  return a
223

    
224
-- | Compute the list of existing archive directories. Note that I/O
225
-- exceptions are swallowed and ignored.
226
allArchiveDirs :: FilePath -> IO [FilePath]
227
allArchiveDirs rootdir = do
228
  let adir = rootdir </> jobQueueArchiveSubDir
229
  contents <- getDirectoryContents adir `Control.Exception.catch`
230
               ignoreIOError [] False
231
                 ("Failed to list queue directory " ++ adir)
232
  let fpaths = map (adir </>) $ filter (not . ("." `isPrefixOf`)) contents
233
  filterM (\path ->
234
             liftM isDirectory (getFileStatus (adir </> path))
235
               `Control.Exception.catch`
236
               ignoreIOError False True
237
                 ("Failed to stat archive path " ++ path)) fpaths
238

    
239
-- | Build list of directories containing job files. Note: compared to
240
-- the Python version, this doesn't ignore a potential lost+found
241
-- file.
242
determineJobDirectories :: FilePath -> Bool -> IO [FilePath]
243
determineJobDirectories rootdir archived = do
244
  other <- if archived
245
             then allArchiveDirs rootdir
246
             else return []
247
  return $ rootdir:other
248

    
249
-- Function equivalent to the \'sequence\' function, that cannot be used because
250
-- of library version conflict on Lucid.
251
-- FIXME: delete this and just use \'sequence\' instead when Lucid compatibility
252
-- will not be required anymore.
253
sequencer :: [Either IOError [JobId]] -> Either IOError [[JobId]]
254
sequencer l = fmap reverse $ foldl seqFolder (Right []) l
255

    
256
-- | Folding function for joining multiple [JobIds] into one list.
257
seqFolder :: Either IOError [[JobId]]
258
          -> Either IOError [JobId]
259
          -> Either IOError [[JobId]]
260
seqFolder (Left e) _ = Left e
261
seqFolder (Right _) (Left e) = Left e
262
seqFolder (Right l) (Right el) = Right $ el:l
263

    
264
-- | Computes the list of all jobs in the given directories.
265
getJobIDs :: [FilePath] -> IO (Either IOError [JobId])
266
getJobIDs paths = liftM (fmap concat . sequencer) (mapM getDirJobIDs paths)
267

    
268
-- | Sorts the a list of job IDs.
269
sortJobIDs :: [JobId] -> [JobId]
270
sortJobIDs = sortBy (comparing fromJobId)
271

    
272
-- | Computes the list of jobs in a given directory.
273
getDirJobIDs :: FilePath -> IO (Either IOError [JobId])
274
getDirJobIDs path = do
275
  either_contents <-
276
    try (getDirectoryContents path) :: IO (Either IOError [FilePath])
277
  case either_contents of
278
    Left e -> do
279
      logWarning $ "Failed to list job directory " ++ path ++ ": " ++ show e
280
      return $ Left e
281
    Right contents -> do
282
      let jids = foldl (\ids file ->
283
                         case parseJobFileId file of
284
                           Nothing -> ids
285
                           Just new_id -> new_id:ids) [] contents
286
      return . Right $ reverse jids
287

    
288
-- | Reads the job data from disk.
289
readJobDataFromDisk :: FilePath -> Bool -> JobId -> IO (Maybe (String, Bool))
290
readJobDataFromDisk rootdir archived jid = do
291
  let live_path = liveJobFile rootdir jid
292
      archived_path = archivedJobFile rootdir jid
293
      all_paths = if archived
294
                    then [(live_path, False), (archived_path, True)]
295
                    else [(live_path, False)]
296
  foldM (\state (path, isarchived) ->
297
           liftM (\r -> Just (r, isarchived)) (readFile path)
298
             `Control.Exception.catch`
299
             ignoreIOError state True
300
               ("Failed to read job file " ++ path)) Nothing all_paths
301

    
302
-- | Failed to load job error.
303
noSuchJob :: Result (QueuedJob, Bool)
304
noSuchJob = Bad "Can't load job file"
305

    
306
-- | Loads a job from disk.
307
loadJobFromDisk :: FilePath -> Bool -> JobId -> IO (Result (QueuedJob, Bool))
308
loadJobFromDisk rootdir archived jid = do
309
  raw <- readJobDataFromDisk rootdir archived jid
310
  -- note: we need some stricness below, otherwise the wrapping in a
311
  -- Result will create too much lazyness, and not close the file
312
  -- descriptors for the individual jobs
313
  return $! case raw of
314
             Nothing -> noSuchJob
315
             Just (str, arch) ->
316
               liftM (\qj -> (qj, arch)) .
317
               fromJResult "Parsing job file" $ Text.JSON.decode str