Statistics
| Branch: | Tag: | Revision:

root / src / Ganeti / JQueue.hs @ 11e90588

History | View | Annotate | Download (10.7 kB)

1
{-# LANGUAGE TemplateHaskell #-}
2

    
3
{-| Implementation of the job queue.
4

    
5
-}
6

    
7
{-
8

    
9
Copyright (C) 2010, 2012 Google Inc.
10

    
11
This program is free software; you can redistribute it and/or modify
12
it under the terms of the GNU General Public License as published by
13
the Free Software Foundation; either version 2 of the License, or
14
(at your option) any later version.
15

    
16
This program is distributed in the hope that it will be useful, but
17
WITHOUT ANY WARRANTY; without even the implied warranty of
18
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19
General Public License for more details.
20

    
21
You should have received a copy of the GNU General Public License
22
along with this program; if not, write to the Free Software
23
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24
02110-1301, USA.
25

    
26
-}
27

    
28
module Ganeti.JQueue
29
    ( QueuedOpCode(..)
30
    , QueuedJob(..)
31
    , InputOpCode(..)
32
    , Timestamp
33
    , noTimestamp
34
    , opStatusFinalized
35
    , extractOpSummary
36
    , calcJobStatus
37
    , calcJobPriority
38
    , jobFileName
39
    , liveJobFile
40
    , archivedJobFile
41
    , determineJobDirectories
42
    , getJobIDs
43
    , sortJobIDs
44
    , loadJobFromDisk
45
    , noSuchJob
46
    ) where
47

    
48
import Control.Exception
49
import Control.Monad
50
import Data.List
51
import Data.Ord (comparing)
52
-- workaround what seems to be a bug in ghc 7.4's TH shadowing code
53
import Prelude hiding (log, id)
54
import System.Directory
55
import System.FilePath
56
import System.IO.Error (isDoesNotExistError)
57
import System.Posix.Files
58
import qualified Text.JSON
59
import Text.JSON.Types
60

    
61
import Ganeti.BasicTypes
62
import qualified Ganeti.Constants as C
63
import Ganeti.JSON
64
import Ganeti.Logging
65
import Ganeti.OpCodes
66
import Ganeti.Path
67
import Ganeti.THH
68
import Ganeti.Types
69

    
70
-- * Data types
71

    
72
-- | The ganeti queue timestamp type
73
type Timestamp = (Int, Int)
74

    
75
-- | Missing timestamp type.
76
noTimestamp :: Timestamp
77
noTimestamp = (-1, -1)
78

    
79
-- | An input opcode.
80
data InputOpCode = ValidOpCode MetaOpCode -- ^ OpCode was parsed successfully
81
                 | InvalidOpCode JSValue  -- ^ Invalid opcode
82
                   deriving (Show, Eq)
83

    
84
-- | JSON instance for 'InputOpCode', trying to parse it and if
85
-- failing, keeping the original JSValue.
86
instance Text.JSON.JSON InputOpCode where
87
  showJSON (ValidOpCode mo) = Text.JSON.showJSON mo
88
  showJSON (InvalidOpCode inv) = inv
89
  readJSON v = case Text.JSON.readJSON v of
90
                 Text.JSON.Error _ -> return $ InvalidOpCode v
91
                 Text.JSON.Ok mo -> return $ ValidOpCode mo
92

    
93
-- | Invalid opcode summary.
94
invalidOp :: String
95
invalidOp = "INVALID_OP"
96

    
97
-- | Tries to extract the opcode summary from an 'InputOpCode'. This
98
-- duplicates some functionality from the 'opSummary' function in
99
-- "Ganeti.OpCodes".
100
extractOpSummary :: InputOpCode -> String
101
extractOpSummary (ValidOpCode metaop) = opSummary $ metaOpCode metaop
102
extractOpSummary (InvalidOpCode (JSObject o)) =
103
  case fromObjWithDefault (fromJSObject o) "OP_ID" ("OP_" ++ invalidOp) of
104
    Just s -> drop 3 s -- drop the OP_ prefix
105
    Nothing -> invalidOp
106
extractOpSummary _ = invalidOp
107

    
108
$(buildObject "QueuedOpCode" "qo"
109
  [ simpleField "input"           [t| InputOpCode |]
110
  , simpleField "status"          [t| OpStatus    |]
111
  , simpleField "result"          [t| JSValue     |]
112
  , defaultField [| [] |] $
113
    simpleField "log"             [t| [(Int, Timestamp, ELogType, JSValue)] |]
114
  , simpleField "priority"        [t| Int         |]
115
  , optionalNullSerField $
116
    simpleField "start_timestamp" [t| Timestamp   |]
117
  , optionalNullSerField $
118
    simpleField "exec_timestamp"  [t| Timestamp   |]
119
  , optionalNullSerField $
120
    simpleField "end_timestamp"   [t| Timestamp   |]
121
  ])
122

    
123
$(buildObject "QueuedJob" "qj"
124
  [ simpleField "id"                 [t| JobId          |]
125
  , simpleField "ops"                [t| [QueuedOpCode] |]
126
  , optionalNullSerField $
127
    simpleField "received_timestamp" [t| Timestamp      |]
128
  , optionalNullSerField $
129
    simpleField "start_timestamp"    [t| Timestamp      |]
130
  , optionalNullSerField $
131
    simpleField "end_timestamp"      [t| Timestamp      |]
132
  ])
133

    
134
-- | Job file prefix.
135
jobFilePrefix :: String
136
jobFilePrefix = "job-"
137

    
138
-- | Computes the filename for a given job ID.
139
jobFileName :: JobId -> FilePath
140
jobFileName jid = jobFilePrefix ++ show (fromJobId jid)
141

    
142
-- | Parses a job ID from a file name.
143
parseJobFileId :: (Monad m) => FilePath -> m JobId
144
parseJobFileId path =
145
  case stripPrefix jobFilePrefix path of
146
    Nothing -> fail $ "Job file '" ++ path ++
147
                      "' doesn't have the correct prefix"
148
    Just suffix -> makeJobIdS suffix
149

    
150
-- | Computes the full path to a live job.
151
liveJobFile :: FilePath -> JobId -> FilePath
152
liveJobFile rootdir jid = rootdir </> jobFileName jid
153

    
154
-- | Computes the full path to an archives job. BROKEN.
155
archivedJobFile :: FilePath -> JobId -> FilePath
156
archivedJobFile rootdir jid =
157
  let subdir = show (fromJobId jid `div` C.jstoreJobsPerArchiveDirectory)
158
  in rootdir </> jobQueueArchiveSubDir </> subdir </> jobFileName jid
159

    
160
-- | Map from opcode status to job status.
161
opStatusToJob :: OpStatus -> JobStatus
162
opStatusToJob OP_STATUS_QUEUED    = JOB_STATUS_QUEUED
163
opStatusToJob OP_STATUS_WAITING   = JOB_STATUS_WAITING
164
opStatusToJob OP_STATUS_SUCCESS   = JOB_STATUS_SUCCESS
165
opStatusToJob OP_STATUS_RUNNING   = JOB_STATUS_RUNNING
166
opStatusToJob OP_STATUS_CANCELING = JOB_STATUS_CANCELING
167
opStatusToJob OP_STATUS_CANCELED  = JOB_STATUS_CANCELED
168
opStatusToJob OP_STATUS_ERROR     = JOB_STATUS_ERROR
169

    
170
-- | Computes a queued job's status.
171
calcJobStatus :: QueuedJob -> JobStatus
172
calcJobStatus QueuedJob { qjOps = ops } =
173
  extractOpSt (map qoStatus ops) JOB_STATUS_QUEUED True
174
    where
175
      terminalStatus OP_STATUS_ERROR     = True
176
      terminalStatus OP_STATUS_CANCELING = True
177
      terminalStatus OP_STATUS_CANCELED  = True
178
      terminalStatus _                   = False
179
      softStatus     OP_STATUS_SUCCESS   = True
180
      softStatus     OP_STATUS_QUEUED    = True
181
      softStatus     _                   = False
182
      extractOpSt [] _ True = JOB_STATUS_SUCCESS
183
      extractOpSt [] d False = d
184
      extractOpSt (x:xs) d old_all
185
           | terminalStatus x = opStatusToJob x -- abort recursion
186
           | softStatus x     = extractOpSt xs d new_all -- continue unchanged
187
           | otherwise        = extractOpSt xs (opStatusToJob x) new_all
188
           where new_all = x == OP_STATUS_SUCCESS && old_all
189

    
190
-- | Determine whether an opcode status is finalized.
191
opStatusFinalized :: OpStatus -> Bool
192
opStatusFinalized = (> OP_STATUS_RUNNING)
193

    
194
-- | Compute a job's priority.
195
calcJobPriority :: QueuedJob -> Int
196
calcJobPriority QueuedJob { qjOps = ops } =
197
  helper . map qoPriority $ filter (not . opStatusFinalized . qoStatus) ops
198
    where helper [] = C.opPrioDefault
199
          helper ps = minimum ps
200

    
201
-- | Log but ignore an 'IOError'.
202
ignoreIOError :: a -> Bool -> String -> IOError -> IO a
203
ignoreIOError a ignore_noent msg e = do
204
  unless (isDoesNotExistError e && ignore_noent) .
205
    logWarning $ msg ++ ": " ++ show e
206
  return a
207

    
208
-- | Compute the list of existing archive directories. Note that I/O
209
-- exceptions are swallowed and ignored.
210
allArchiveDirs :: FilePath -> IO [FilePath]
211
allArchiveDirs rootdir = do
212
  let adir = rootdir </> jobQueueArchiveSubDir
213
  contents <- getDirectoryContents adir `Control.Exception.catch`
214
               ignoreIOError [] False
215
                 ("Failed to list queue directory " ++ adir)
216
  let fpaths = map (adir </>) $ filter (not . ("." `isPrefixOf`)) contents
217
  filterM (\path ->
218
             liftM isDirectory (getFileStatus (adir </> path))
219
               `Control.Exception.catch`
220
               ignoreIOError False True
221
                 ("Failed to stat archive path " ++ path)) fpaths
222

    
223
-- | Build list of directories containing job files. Note: compared to
224
-- the Python version, this doesn't ignore a potential lost+found
225
-- file.
226
determineJobDirectories :: FilePath -> Bool -> IO [FilePath]
227
determineJobDirectories rootdir archived = do
228
  other <- if archived
229
             then allArchiveDirs rootdir
230
             else return []
231
  return $ rootdir:other
232

    
233
-- Function equivalent to the \'sequence\' function, that cannot be used because
234
-- of library version conflict on Lucid.
235
-- FIXME: delete this and just use \'sequence\' instead when Lucid compatibility
236
-- will not be required anymore.
237
sequencer :: [Either IOError [JobId]] -> Either IOError [[JobId]]
238
sequencer l = fmap reverse $ foldl seqFolder (Right []) l
239

    
240
-- | Folding function for joining multiple [JobIds] into one list.
241
seqFolder :: Either IOError [[JobId]]
242
          -> Either IOError [JobId]
243
          -> Either IOError [[JobId]]
244
seqFolder (Left e) _ = Left e
245
seqFolder (Right _) (Left e) = Left e
246
seqFolder (Right l) (Right el) = Right $ el:l
247

    
248
-- | Computes the list of all jobs in the given directories.
249
getJobIDs :: [FilePath] -> IO (Either IOError [JobId])
250
getJobIDs paths = liftM (fmap concat . sequencer) (mapM getDirJobIDs paths)
251

    
252
-- | Sorts the a list of job IDs.
253
sortJobIDs :: [JobId] -> [JobId]
254
sortJobIDs = sortBy (comparing fromJobId)
255

    
256
-- | Computes the list of jobs in a given directory.
257
getDirJobIDs :: FilePath -> IO (Either IOError [JobId])
258
getDirJobIDs path = do
259
  either_contents <-
260
    try (getDirectoryContents path) :: IO (Either IOError [FilePath])
261
  case either_contents of
262
    Left e -> do
263
      logWarning $ "Failed to list job directory " ++ path ++ ": " ++ show e
264
      return $ Left e
265
    Right contents -> do
266
      let jids = foldl (\ids file ->
267
                         case parseJobFileId file of
268
                           Nothing -> ids
269
                           Just new_id -> new_id:ids) [] contents
270
      return . Right $ reverse jids
271

    
272
-- | Reads the job data from disk.
273
readJobDataFromDisk :: FilePath -> Bool -> JobId -> IO (Maybe (String, Bool))
274
readJobDataFromDisk rootdir archived jid = do
275
  let live_path = liveJobFile rootdir jid
276
      archived_path = archivedJobFile rootdir jid
277
      all_paths = if archived
278
                    then [(live_path, False), (archived_path, True)]
279
                    else [(live_path, False)]
280
  foldM (\state (path, isarchived) ->
281
           liftM (\r -> Just (r, isarchived)) (readFile path)
282
             `Control.Exception.catch`
283
             ignoreIOError state True
284
               ("Failed to read job file " ++ path)) Nothing all_paths
285

    
286
-- | Failed to load job error.
287
noSuchJob :: Result (QueuedJob, Bool)
288
noSuchJob = Bad "Can't load job file"
289

    
290
-- | Loads a job from disk.
291
loadJobFromDisk :: FilePath -> Bool -> JobId -> IO (Result (QueuedJob, Bool))
292
loadJobFromDisk rootdir archived jid = do
293
  raw <- readJobDataFromDisk rootdir archived jid
294
  -- note: we need some stricness below, otherwise the wrapping in a
295
  -- Result will create too much lazyness, and not close the file
296
  -- descriptors for the individual jobs
297
  return $! case raw of
298
             Nothing -> noSuchJob
299
             Just (str, arch) ->
300
               liftM (\qj -> (qj, arch)) .
301
               fromJResult "Parsing job file" $ Text.JSON.decode str