Rename htools/ to src/
[ganeti-local] / src / Ganeti / JQueue.hs
1 {-# LANGUAGE TemplateHaskell #-}
2
3 {-| Implementation of the job queue.
4
5 -}
6
7 {-
8
9 Copyright (C) 2010, 2012 Google Inc.
10
11 This program is free software; you can redistribute it and/or modify
12 it under the terms of the GNU General Public License as published by
13 the Free Software Foundation; either version 2 of the License, or
14 (at your option) any later version.
15
16 This program is distributed in the hope that it will be useful, but
17 WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19 General Public License for more details.
20
21 You should have received a copy of the GNU General Public License
22 along with this program; if not, write to the Free Software
23 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24 02110-1301, USA.
25
26 -}
27
28 module Ganeti.JQueue
29     ( QueuedOpCode(..)
30     , QueuedJob(..)
31     , InputOpCode(..)
32     , Timestamp
33     , noTimestamp
34     , opStatusFinalized
35     , extractOpSummary
36     , calcJobStatus
37     , calcJobPriority
38     , jobFileName
39     , liveJobFile
40     , archivedJobFile
41     , determineJobDirectories
42     , getJobIDs
43     , sortJobIDs
44     , loadJobFromDisk
45     , noSuchJob
46     ) where
47
48 import Control.Exception
49 import Control.Monad
50 import Data.List
51 import Data.Ord (comparing)
52 -- workaround what seems to be a bug in ghc 7.4's TH shadowing code
53 import Prelude hiding (log, id)
54 import System.Directory
55 import System.FilePath
56 import System.IO.Error (isDoesNotExistError)
57 import System.Posix.Files
58 import qualified Text.JSON
59 import Text.JSON.Types
60
61 import Ganeti.BasicTypes
62 import qualified Ganeti.Constants as C
63 import Ganeti.JSON
64 import Ganeti.Logging
65 import Ganeti.OpCodes
66 import Ganeti.Path
67 import Ganeti.THH
68 import Ganeti.Types
69
70 -- * Data types
71
72 -- | The ganeti queue timestamp type
73 type Timestamp = (Int, Int)
74
75 -- | Missing timestamp type.
76 noTimestamp :: Timestamp
77 noTimestamp = (-1, -1)
78
79 -- | An input opcode.
80 data InputOpCode = ValidOpCode MetaOpCode -- ^ OpCode was parsed successfully
81                  | InvalidOpCode JSValue  -- ^ Invalid opcode
82                    deriving (Show, Eq)
83
84 -- | JSON instance for 'InputOpCode', trying to parse it and if
85 -- failing, keeping the original JSValue.
86 instance Text.JSON.JSON InputOpCode where
87   showJSON (ValidOpCode mo) = Text.JSON.showJSON mo
88   showJSON (InvalidOpCode inv) = inv
89   readJSON v = case Text.JSON.readJSON v of
90                  Text.JSON.Error _ -> return $ InvalidOpCode v
91                  Text.JSON.Ok mo -> return $ ValidOpCode mo
92
93 -- | Invalid opcode summary.
94 invalidOp :: String
95 invalidOp = "INVALID_OP"
96
97 -- | Tries to extract the opcode summary from an 'InputOpCode'. This
98 -- duplicates some functionality from the 'opSummary' function in
99 -- "Ganeti.OpCodes".
100 extractOpSummary :: InputOpCode -> String
101 extractOpSummary (ValidOpCode metaop) = opSummary $ metaOpCode metaop
102 extractOpSummary (InvalidOpCode (JSObject o)) =
103   case fromObjWithDefault (fromJSObject o) "OP_ID" ("OP_" ++ invalidOp) of
104     Just s -> drop 3 s -- drop the OP_ prefix
105     Nothing -> invalidOp
106 extractOpSummary _ = invalidOp
107
108 $(buildObject "QueuedOpCode" "qo"
109   [ simpleField "input"           [t| InputOpCode |]
110   , simpleField "status"          [t| OpStatus    |]
111   , simpleField "result"          [t| JSValue     |]
112   , defaultField [| [] |] $
113     simpleField "log"             [t| [(Int, Timestamp, ELogType, JSValue)] |]
114   , simpleField "priority"        [t| Int         |]
115   , optionalNullSerField $
116     simpleField "start_timestamp" [t| Timestamp   |]
117   , optionalNullSerField $
118     simpleField "exec_timestamp"  [t| Timestamp   |]
119   , optionalNullSerField $
120     simpleField "end_timestamp"   [t| Timestamp   |]
121   ])
122
123 $(buildObject "QueuedJob" "qj"
124   [ simpleField "id"                 [t| JobId          |]
125   , simpleField "ops"                [t| [QueuedOpCode] |]
126   , optionalNullSerField $
127     simpleField "received_timestamp" [t| Timestamp      |]
128   , optionalNullSerField $
129     simpleField "start_timestamp"    [t| Timestamp      |]
130   , optionalNullSerField $
131     simpleField "end_timestamp"      [t| Timestamp      |]
132   ])
133
134 -- | Job file prefix.
135 jobFilePrefix :: String
136 jobFilePrefix = "job-"
137
138 -- | Computes the filename for a given job ID.
139 jobFileName :: JobId -> FilePath
140 jobFileName jid = jobFilePrefix ++ show (fromJobId jid)
141
142 -- | Parses a job ID from a file name.
143 parseJobFileId :: (Monad m) => FilePath -> m JobId
144 parseJobFileId path =
145   case stripPrefix jobFilePrefix path of
146     Nothing -> fail $ "Job file '" ++ path ++
147                       "' doesn't have the correct prefix"
148     Just suffix -> makeJobIdS suffix
149
150 -- | Computes the full path to a live job.
151 liveJobFile :: FilePath -> JobId -> FilePath
152 liveJobFile rootdir jid = rootdir </> jobFileName jid
153
154 -- | Computes the full path to an archives job. BROKEN.
155 archivedJobFile :: FilePath -> JobId -> FilePath
156 archivedJobFile rootdir jid =
157   let subdir = show (fromJobId jid `div` C.jstoreJobsPerArchiveDirectory)
158   in rootdir </> jobQueueArchiveSubDir </> subdir </> jobFileName jid
159
160 -- | Map from opcode status to job status.
161 opStatusToJob :: OpStatus -> JobStatus
162 opStatusToJob OP_STATUS_QUEUED    = JOB_STATUS_QUEUED
163 opStatusToJob OP_STATUS_WAITING   = JOB_STATUS_WAITING
164 opStatusToJob OP_STATUS_SUCCESS   = JOB_STATUS_SUCCESS
165 opStatusToJob OP_STATUS_RUNNING   = JOB_STATUS_RUNNING
166 opStatusToJob OP_STATUS_CANCELING = JOB_STATUS_CANCELING
167 opStatusToJob OP_STATUS_CANCELED  = JOB_STATUS_CANCELED
168 opStatusToJob OP_STATUS_ERROR     = JOB_STATUS_ERROR
169
170 -- | Computes a queued job's status.
171 calcJobStatus :: QueuedJob -> JobStatus
172 calcJobStatus QueuedJob { qjOps = ops } =
173   extractOpSt (map qoStatus ops) JOB_STATUS_QUEUED True
174     where
175       terminalStatus OP_STATUS_ERROR     = True
176       terminalStatus OP_STATUS_CANCELING = True
177       terminalStatus OP_STATUS_CANCELED  = True
178       terminalStatus _                   = False
179       softStatus     OP_STATUS_SUCCESS   = True
180       softStatus     OP_STATUS_QUEUED    = True
181       softStatus     _                   = False
182       extractOpSt [] _ True = JOB_STATUS_SUCCESS
183       extractOpSt [] d False = d
184       extractOpSt (x:xs) d old_all
185            | terminalStatus x = opStatusToJob x -- abort recursion
186            | softStatus x     = extractOpSt xs d new_all -- continue unchanged
187            | otherwise        = extractOpSt xs (opStatusToJob x) new_all
188            where new_all = x == OP_STATUS_SUCCESS && old_all
189
190 -- | Determine whether an opcode status is finalized.
191 opStatusFinalized :: OpStatus -> Bool
192 opStatusFinalized = (> OP_STATUS_RUNNING)
193
194 -- | Compute a job's priority.
195 calcJobPriority :: QueuedJob -> Int
196 calcJobPriority QueuedJob { qjOps = ops } =
197   helper . map qoPriority $ filter (not . opStatusFinalized . qoStatus) ops
198     where helper [] = C.opPrioDefault
199           helper ps = minimum ps
200
201 -- | Log but ignore an 'IOError'.
202 ignoreIOError :: a -> Bool -> String -> IOError -> IO a
203 ignoreIOError a ignore_noent msg e = do
204   unless (isDoesNotExistError e && ignore_noent) .
205     logWarning $ msg ++ ": " ++ show e
206   return a
207
208 -- | Compute the list of existing archive directories. Note that I/O
209 -- exceptions are swallowed and ignored.
210 allArchiveDirs :: FilePath -> IO [FilePath]
211 allArchiveDirs rootdir = do
212   let adir = rootdir </> jobQueueArchiveSubDir
213   contents <- getDirectoryContents adir `Control.Exception.catch`
214                ignoreIOError [] False
215                  ("Failed to list queue directory " ++ adir)
216   let fpaths = map (adir </>) $ filter (not . ("." `isPrefixOf`)) contents
217   filterM (\path ->
218              liftM isDirectory (getFileStatus (adir </> path))
219                `Control.Exception.catch`
220                ignoreIOError False True
221                  ("Failed to stat archive path " ++ path)) fpaths
222
223 -- | Build list of directories containing job files. Note: compared to
224 -- the Python version, this doesn't ignore a potential lost+found
225 -- file.
226 determineJobDirectories :: FilePath -> Bool -> IO [FilePath]
227 determineJobDirectories rootdir archived = do
228   other <- if archived
229              then allArchiveDirs rootdir
230              else return []
231   return $ rootdir:other
232
233 -- | Computes the list of all jobs in the given directories.
234 getJobIDs :: [FilePath] -> IO [JobId]
235 getJobIDs = liftM concat . mapM getDirJobIDs
236
237 -- | Sorts the a list of job IDs.
238 sortJobIDs :: [JobId] -> [JobId]
239 sortJobIDs = sortBy (comparing fromJobId)
240
241 -- | Computes the list of jobs in a given directory.
242 getDirJobIDs :: FilePath -> IO [JobId]
243 getDirJobIDs path = do
244   contents <- getDirectoryContents path `Control.Exception.catch`
245                 ignoreIOError [] False
246                   ("Failed to list job directory " ++ path)
247   let jids = foldl (\ids file ->
248                       case parseJobFileId file of
249                         Nothing -> ids
250                         Just new_id -> new_id:ids) [] contents
251   return $ reverse jids
252
253 -- | Reads the job data from disk.
254 readJobDataFromDisk :: FilePath -> Bool -> JobId -> IO (Maybe (String, Bool))
255 readJobDataFromDisk rootdir archived jid = do
256   let live_path = liveJobFile rootdir jid
257       archived_path = archivedJobFile rootdir jid
258       all_paths = if archived
259                     then [(live_path, False), (archived_path, True)]
260                     else [(live_path, False)]
261   foldM (\state (path, isarchived) ->
262            liftM (\r -> Just (r, isarchived)) (readFile path)
263              `Control.Exception.catch`
264              ignoreIOError state True
265                ("Failed to read job file " ++ path)) Nothing all_paths
266
267 -- | Failed to load job error.
268 noSuchJob :: Result (QueuedJob, Bool)
269 noSuchJob = Bad "Can't load job file"
270
271 -- | Loads a job from disk.
272 loadJobFromDisk :: FilePath -> Bool -> JobId -> IO (Result (QueuedJob, Bool))
273 loadJobFromDisk rootdir archived jid = do
274   raw <- readJobDataFromDisk rootdir archived jid
275   -- note: we need some stricness below, otherwise the wrapping in a
276   -- Result will create too much lazyness, and not close the file
277   -- descriptors for the individual jobs
278   return $! case raw of
279              Nothing -> noSuchJob
280              Just (str, arch) ->
281                liftM (\qj -> (qj, arch)) .
282                fromJResult "Parsing job file" $ Text.JSON.decode str