Statistics
| Branch: | Tag: | Revision:

root / src / Ganeti / JQueue.hs @ 1c1132f4

History | View | Annotate | Download (11.9 kB)

1
{-# LANGUAGE TemplateHaskell #-}
2

    
3
{-| Implementation of the job queue.
4

    
5
-}
6

    
7
{-
8

    
9
Copyright (C) 2010, 2012 Google Inc.
10

    
11
This program is free software; you can redistribute it and/or modify
12
it under the terms of the GNU General Public License as published by
13
the Free Software Foundation; either version 2 of the License, or
14
(at your option) any later version.
15

    
16
This program is distributed in the hope that it will be useful, but
17
WITHOUT ANY WARRANTY; without even the implied warranty of
18
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19
General Public License for more details.
20

    
21
You should have received a copy of the GNU General Public License
22
along with this program; if not, write to the Free Software
23
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24
02110-1301, USA.
25

    
26
-}
27

    
28
module Ganeti.JQueue
29
    ( QueuedOpCode(..)
30
    , QueuedJob(..)
31
    , InputOpCode(..)
32
    , queuedOpCodeFromMetaOpCode
33
    , queuedJobFromOpCodes
34
    , Timestamp
35
    , noTimestamp
36
    , opStatusFinalized
37
    , extractOpSummary
38
    , calcJobStatus
39
    , calcJobPriority
40
    , jobFileName
41
    , liveJobFile
42
    , archivedJobFile
43
    , determineJobDirectories
44
    , getJobIDs
45
    , sortJobIDs
46
    , loadJobFromDisk
47
    , noSuchJob
48
    ) where
49

    
50
import Control.Exception
51
import Control.Monad
52
import Data.List
53
import Data.Maybe
54
import Data.Ord (comparing)
55
-- workaround what seems to be a bug in ghc 7.4's TH shadowing code
56
import Prelude hiding (log, id)
57
import System.Directory
58
import System.FilePath
59
import System.IO.Error (isDoesNotExistError)
60
import System.Posix.Files
61
import qualified Text.JSON
62
import Text.JSON.Types
63

    
64
import Ganeti.BasicTypes
65
import qualified Ganeti.Constants as C
66
import Ganeti.JSON
67
import Ganeti.Logging
68
import Ganeti.OpCodes
69
import Ganeti.Path
70
import Ganeti.THH
71
import Ganeti.Types
72

    
73
-- * Data types
74

    
75
-- | The ganeti queue timestamp type
76
type Timestamp = (Int, Int)
77

    
78
-- | Missing timestamp type.
79
noTimestamp :: Timestamp
80
noTimestamp = (-1, -1)
81

    
82
-- | An input opcode.
83
data InputOpCode = ValidOpCode MetaOpCode -- ^ OpCode was parsed successfully
84
                 | InvalidOpCode JSValue  -- ^ Invalid opcode
85
                   deriving (Show, Eq)
86

    
87
-- | JSON instance for 'InputOpCode', trying to parse it and if
88
-- failing, keeping the original JSValue.
89
instance Text.JSON.JSON InputOpCode where
90
  showJSON (ValidOpCode mo) = Text.JSON.showJSON mo
91
  showJSON (InvalidOpCode inv) = inv
92
  readJSON v = case Text.JSON.readJSON v of
93
                 Text.JSON.Error _ -> return $ InvalidOpCode v
94
                 Text.JSON.Ok mo -> return $ ValidOpCode mo
95

    
96
-- | Invalid opcode summary.
97
invalidOp :: String
98
invalidOp = "INVALID_OP"
99

    
100
-- | Tries to extract the opcode summary from an 'InputOpCode'. This
101
-- duplicates some functionality from the 'opSummary' function in
102
-- "Ganeti.OpCodes".
103
extractOpSummary :: InputOpCode -> String
104
extractOpSummary (ValidOpCode metaop) = opSummary $ metaOpCode metaop
105
extractOpSummary (InvalidOpCode (JSObject o)) =
106
  case fromObjWithDefault (fromJSObject o) "OP_ID" ("OP_" ++ invalidOp) of
107
    Just s -> drop 3 s -- drop the OP_ prefix
108
    Nothing -> invalidOp
109
extractOpSummary _ = invalidOp
110

    
111
$(buildObject "QueuedOpCode" "qo"
112
  [ simpleField "input"           [t| InputOpCode |]
113
  , simpleField "status"          [t| OpStatus    |]
114
  , simpleField "result"          [t| JSValue     |]
115
  , defaultField [| [] |] $
116
    simpleField "log"             [t| [(Int, Timestamp, ELogType, JSValue)] |]
117
  , simpleField "priority"        [t| Int         |]
118
  , optionalNullSerField $
119
    simpleField "start_timestamp" [t| Timestamp   |]
120
  , optionalNullSerField $
121
    simpleField "exec_timestamp"  [t| Timestamp   |]
122
  , optionalNullSerField $
123
    simpleField "end_timestamp"   [t| Timestamp   |]
124
  ])
125

    
126
$(buildObject "QueuedJob" "qj"
127
  [ simpleField "id"                 [t| JobId          |]
128
  , simpleField "ops"                [t| [QueuedOpCode] |]
129
  , optionalNullSerField $
130
    simpleField "received_timestamp" [t| Timestamp      |]
131
  , optionalNullSerField $
132
    simpleField "start_timestamp"    [t| Timestamp      |]
133
  , optionalNullSerField $
134
    simpleField "end_timestamp"      [t| Timestamp      |]
135
  ])
136

    
137
-- | Convenience function to obtain a QueuedOpCode from a MetaOpCode
138
queuedOpCodeFromMetaOpCode :: MetaOpCode -> QueuedOpCode
139
queuedOpCodeFromMetaOpCode op =
140
  QueuedOpCode { qoInput = ValidOpCode op
141
               , qoStatus = OP_STATUS_QUEUED
142
               , qoPriority = opSubmitPriorityToRaw . opPriority . metaParams
143
                              $ op
144
               , qoLog = []
145
               , qoResult = JSNull
146
               , qoStartTimestamp = Nothing
147
               , qoEndTimestamp = Nothing
148
               , qoExecTimestamp = Nothing
149
               }
150

    
151
-- | From a job-id and a list of op-codes create a job. This is
152
-- the pure part of job creation, as allocating a new job id
153
-- lives in IO.
154
queuedJobFromOpCodes :: (Monad m) => JobId -> [MetaOpCode] -> m QueuedJob
155
queuedJobFromOpCodes jobid ops = do
156
  ops' <- mapM (`resolveDependencies` jobid) ops
157
  return QueuedJob { qjId = jobid
158
                   , qjOps = map queuedOpCodeFromMetaOpCode ops'
159
                   , qjReceivedTimestamp = Nothing 
160
                   , qjStartTimestamp = Nothing
161
                   , qjEndTimestamp = Nothing
162
                   }
163

    
164
-- | Job file prefix.
165
jobFilePrefix :: String
166
jobFilePrefix = "job-"
167

    
168
-- | Computes the filename for a given job ID.
169
jobFileName :: JobId -> FilePath
170
jobFileName jid = jobFilePrefix ++ show (fromJobId jid)
171

    
172
-- | Parses a job ID from a file name.
173
parseJobFileId :: (Monad m) => FilePath -> m JobId
174
parseJobFileId path =
175
  case stripPrefix jobFilePrefix path of
176
    Nothing -> fail $ "Job file '" ++ path ++
177
                      "' doesn't have the correct prefix"
178
    Just suffix -> makeJobIdS suffix
179

    
180
-- | Computes the full path to a live job.
181
liveJobFile :: FilePath -> JobId -> FilePath
182
liveJobFile rootdir jid = rootdir </> jobFileName jid
183

    
184
-- | Computes the full path to an archives job. BROKEN.
185
archivedJobFile :: FilePath -> JobId -> FilePath
186
archivedJobFile rootdir jid =
187
  let subdir = show (fromJobId jid `div` C.jstoreJobsPerArchiveDirectory)
188
  in rootdir </> jobQueueArchiveSubDir </> subdir </> jobFileName jid
189

    
190
-- | Map from opcode status to job status.
191
opStatusToJob :: OpStatus -> JobStatus
192
opStatusToJob OP_STATUS_QUEUED    = JOB_STATUS_QUEUED
193
opStatusToJob OP_STATUS_WAITING   = JOB_STATUS_WAITING
194
opStatusToJob OP_STATUS_SUCCESS   = JOB_STATUS_SUCCESS
195
opStatusToJob OP_STATUS_RUNNING   = JOB_STATUS_RUNNING
196
opStatusToJob OP_STATUS_CANCELING = JOB_STATUS_CANCELING
197
opStatusToJob OP_STATUS_CANCELED  = JOB_STATUS_CANCELED
198
opStatusToJob OP_STATUS_ERROR     = JOB_STATUS_ERROR
199

    
200
-- | Computes a queued job's status.
201
calcJobStatus :: QueuedJob -> JobStatus
202
calcJobStatus QueuedJob { qjOps = ops } =
203
  extractOpSt (map qoStatus ops) JOB_STATUS_QUEUED True
204
    where
205
      terminalStatus OP_STATUS_ERROR     = True
206
      terminalStatus OP_STATUS_CANCELING = True
207
      terminalStatus OP_STATUS_CANCELED  = True
208
      terminalStatus _                   = False
209
      softStatus     OP_STATUS_SUCCESS   = True
210
      softStatus     OP_STATUS_QUEUED    = True
211
      softStatus     _                   = False
212
      extractOpSt [] _ True = JOB_STATUS_SUCCESS
213
      extractOpSt [] d False = d
214
      extractOpSt (x:xs) d old_all
215
           | terminalStatus x = opStatusToJob x -- abort recursion
216
           | softStatus x     = extractOpSt xs d new_all -- continue unchanged
217
           | otherwise        = extractOpSt xs (opStatusToJob x) new_all
218
           where new_all = x == OP_STATUS_SUCCESS && old_all
219

    
220
-- | Determine whether an opcode status is finalized.
221
opStatusFinalized :: OpStatus -> Bool
222
opStatusFinalized = (> OP_STATUS_RUNNING)
223

    
224
-- | Compute a job's priority.
225
calcJobPriority :: QueuedJob -> Int
226
calcJobPriority QueuedJob { qjOps = ops } =
227
  helper . map qoPriority $ filter (not . opStatusFinalized . qoStatus) ops
228
    where helper [] = C.opPrioDefault
229
          helper ps = minimum ps
230

    
231
-- | Log but ignore an 'IOError'.
232
ignoreIOError :: a -> Bool -> String -> IOError -> IO a
233
ignoreIOError a ignore_noent msg e = do
234
  unless (isDoesNotExistError e && ignore_noent) .
235
    logWarning $ msg ++ ": " ++ show e
236
  return a
237

    
238
-- | Compute the list of existing archive directories. Note that I/O
239
-- exceptions are swallowed and ignored.
240
allArchiveDirs :: FilePath -> IO [FilePath]
241
allArchiveDirs rootdir = do
242
  let adir = rootdir </> jobQueueArchiveSubDir
243
  contents <- getDirectoryContents adir `Control.Exception.catch`
244
               ignoreIOError [] False
245
                 ("Failed to list queue directory " ++ adir)
246
  let fpaths = map (adir </>) $ filter (not . ("." `isPrefixOf`)) contents
247
  filterM (\path ->
248
             liftM isDirectory (getFileStatus (adir </> path))
249
               `Control.Exception.catch`
250
               ignoreIOError False True
251
                 ("Failed to stat archive path " ++ path)) fpaths
252

    
253
-- | Build list of directories containing job files. Note: compared to
254
-- the Python version, this doesn't ignore a potential lost+found
255
-- file.
256
determineJobDirectories :: FilePath -> Bool -> IO [FilePath]
257
determineJobDirectories rootdir archived = do
258
  other <- if archived
259
             then allArchiveDirs rootdir
260
             else return []
261
  return $ rootdir:other
262

    
263
-- Function equivalent to the \'sequence\' function, that cannot be used because
264
-- of library version conflict on Lucid.
265
-- FIXME: delete this and just use \'sequence\' instead when Lucid compatibility
266
-- will not be required anymore.
267
sequencer :: [Either IOError [JobId]] -> Either IOError [[JobId]]
268
sequencer l = fmap reverse $ foldl seqFolder (Right []) l
269

    
270
-- | Folding function for joining multiple [JobIds] into one list.
271
seqFolder :: Either IOError [[JobId]]
272
          -> Either IOError [JobId]
273
          -> Either IOError [[JobId]]
274
seqFolder (Left e) _ = Left e
275
seqFolder (Right _) (Left e) = Left e
276
seqFolder (Right l) (Right el) = Right $ el:l
277

    
278
-- | Computes the list of all jobs in the given directories.
279
getJobIDs :: [FilePath] -> IO (Either IOError [JobId])
280
getJobIDs paths = liftM (fmap concat . sequencer) (mapM getDirJobIDs paths)
281

    
282
-- | Sorts the a list of job IDs.
283
sortJobIDs :: [JobId] -> [JobId]
284
sortJobIDs = sortBy (comparing fromJobId)
285

    
286
-- | Computes the list of jobs in a given directory.
287
getDirJobIDs :: FilePath -> IO (Either IOError [JobId])
288
getDirJobIDs path = do
289
  either_contents <-
290
    try (getDirectoryContents path) :: IO (Either IOError [FilePath])
291
  case either_contents of
292
    Left e -> do
293
      logWarning $ "Failed to list job directory " ++ path ++ ": " ++ show e
294
      return $ Left e
295
    Right contents -> do
296
      let jids = foldl (\ids file ->
297
                         case parseJobFileId file of
298
                           Nothing -> ids
299
                           Just new_id -> new_id:ids) [] contents
300
      return . Right $ reverse jids
301

    
302
-- | Reads the job data from disk.
303
readJobDataFromDisk :: FilePath -> Bool -> JobId -> IO (Maybe (String, Bool))
304
readJobDataFromDisk rootdir archived jid = do
305
  let live_path = liveJobFile rootdir jid
306
      archived_path = archivedJobFile rootdir jid
307
      all_paths = if archived
308
                    then [(live_path, False), (archived_path, True)]
309
                    else [(live_path, False)]
310
  foldM (\state (path, isarchived) ->
311
           liftM (\r -> Just (r, isarchived)) (readFile path)
312
             `Control.Exception.catch`
313
             ignoreIOError state True
314
               ("Failed to read job file " ++ path)) Nothing all_paths
315

    
316
-- | Failed to load job error.
317
noSuchJob :: Result (QueuedJob, Bool)
318
noSuchJob = Bad "Can't load job file"
319

    
320
-- | Loads a job from disk.
321
loadJobFromDisk :: FilePath -> Bool -> JobId -> IO (Result (QueuedJob, Bool))
322
loadJobFromDisk rootdir archived jid = do
323
  raw <- readJobDataFromDisk rootdir archived jid
324
  -- note: we need some stricness below, otherwise the wrapping in a
325
  -- Result will create too much lazyness, and not close the file
326
  -- descriptors for the individual jobs
327
  return $! case raw of
328
             Nothing -> noSuchJob
329
             Just (str, arch) ->
330
               liftM (\qj -> (qj, arch)) .
331
               fromJResult "Parsing job file" $ Text.JSON.decode str