Statistics
| Branch: | Tag: | Revision:

root / src / Ganeti / JQueue.hs @ f7819050

History | View | Annotate | Download (14.3 kB)

1
{-# LANGUAGE TemplateHaskell #-}
2

    
3
{-| Implementation of the job queue.
4

    
5
-}
6

    
7
{-
8

    
9
Copyright (C) 2010, 2012 Google Inc.
10

    
11
This program is free software; you can redistribute it and/or modify
12
it under the terms of the GNU General Public License as published by
13
the Free Software Foundation; either version 2 of the License, or
14
(at your option) any later version.
15

    
16
This program is distributed in the hope that it will be useful, but
17
WITHOUT ANY WARRANTY; without even the implied warranty of
18
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19
General Public License for more details.
20

    
21
You should have received a copy of the GNU General Public License
22
along with this program; if not, write to the Free Software
23
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24
02110-1301, USA.
25

    
26
-}
27

    
28
module Ganeti.JQueue
29
    ( QueuedOpCode(..)
30
    , QueuedJob(..)
31
    , InputOpCode(..)
32
    , queuedOpCodeFromMetaOpCode
33
    , queuedJobFromOpCodes
34
    , Timestamp
35
    , noTimestamp
36
    , opStatusFinalized
37
    , extractOpSummary
38
    , calcJobStatus
39
    , calcJobPriority
40
    , jobFileName
41
    , liveJobFile
42
    , archivedJobFile
43
    , determineJobDirectories
44
    , getJobIDs
45
    , sortJobIDs
46
    , loadJobFromDisk
47
    , noSuchJob
48
    , readSerialFromDisk
49
    , allocateJobIds
50
    , allocateJobId
51
    , writeJobToDisk
52
    , isQueueOpen
53
    ) where
54

    
55
import Control.Concurrent.MVar
56
import Control.Exception
57
import Control.Monad
58
import Data.List
59
import Data.Maybe
60
import Data.Ord (comparing)
61
-- workaround what seems to be a bug in ghc 7.4's TH shadowing code
62
import Prelude hiding (log, id)
63
import System.Directory
64
import System.FilePath
65
import System.IO.Error (isDoesNotExistError)
66
import System.Posix.Files
67
import qualified Text.JSON
68
import Text.JSON.Types
69

    
70
import Ganeti.BasicTypes
71
import qualified Ganeti.Constants as C
72
import Ganeti.JSON
73
import Ganeti.Logging
74
import Ganeti.Objects (Node)
75
import Ganeti.OpCodes
76
import Ganeti.Path
77
import Ganeti.Rpc (executeRpcCall, RpcCallJobqueueUpdate(..))
78
import Ganeti.THH
79
import Ganeti.Types
80
import Ganeti.Utils
81

    
82
-- * Data types
83

    
84
-- | The ganeti queue timestamp type
85
type Timestamp = (Int, Int)
86

    
87
-- | Missing timestamp type.
88
noTimestamp :: Timestamp
89
noTimestamp = (-1, -1)
90

    
91
-- | An input opcode.
92
data InputOpCode = ValidOpCode MetaOpCode -- ^ OpCode was parsed successfully
93
                 | InvalidOpCode JSValue  -- ^ Invalid opcode
94
                   deriving (Show, Eq)
95

    
96
-- | JSON instance for 'InputOpCode', trying to parse it and if
97
-- failing, keeping the original JSValue.
98
instance Text.JSON.JSON InputOpCode where
99
  showJSON (ValidOpCode mo) = Text.JSON.showJSON mo
100
  showJSON (InvalidOpCode inv) = inv
101
  readJSON v = case Text.JSON.readJSON v of
102
                 Text.JSON.Error _ -> return $ InvalidOpCode v
103
                 Text.JSON.Ok mo -> return $ ValidOpCode mo
104

    
105
-- | Invalid opcode summary.
106
invalidOp :: String
107
invalidOp = "INVALID_OP"
108

    
109
-- | Tries to extract the opcode summary from an 'InputOpCode'. This
110
-- duplicates some functionality from the 'opSummary' function in
111
-- "Ganeti.OpCodes".
112
extractOpSummary :: InputOpCode -> String
113
extractOpSummary (ValidOpCode metaop) = opSummary $ metaOpCode metaop
114
extractOpSummary (InvalidOpCode (JSObject o)) =
115
  case fromObjWithDefault (fromJSObject o) "OP_ID" ("OP_" ++ invalidOp) of
116
    Just s -> drop 3 s -- drop the OP_ prefix
117
    Nothing -> invalidOp
118
extractOpSummary _ = invalidOp
119

    
120
$(buildObject "QueuedOpCode" "qo"
121
  [ simpleField "input"           [t| InputOpCode |]
122
  , simpleField "status"          [t| OpStatus    |]
123
  , simpleField "result"          [t| JSValue     |]
124
  , defaultField [| [] |] $
125
    simpleField "log"             [t| [(Int, Timestamp, ELogType, JSValue)] |]
126
  , simpleField "priority"        [t| Int         |]
127
  , optionalNullSerField $
128
    simpleField "start_timestamp" [t| Timestamp   |]
129
  , optionalNullSerField $
130
    simpleField "exec_timestamp"  [t| Timestamp   |]
131
  , optionalNullSerField $
132
    simpleField "end_timestamp"   [t| Timestamp   |]
133
  ])
134

    
135
$(buildObject "QueuedJob" "qj"
136
  [ simpleField "id"                 [t| JobId          |]
137
  , simpleField "ops"                [t| [QueuedOpCode] |]
138
  , optionalNullSerField $
139
    simpleField "received_timestamp" [t| Timestamp      |]
140
  , optionalNullSerField $
141
    simpleField "start_timestamp"    [t| Timestamp      |]
142
  , optionalNullSerField $
143
    simpleField "end_timestamp"      [t| Timestamp      |]
144
  ])
145

    
146
-- | Convenience function to obtain a QueuedOpCode from a MetaOpCode
147
queuedOpCodeFromMetaOpCode :: MetaOpCode -> QueuedOpCode
148
queuedOpCodeFromMetaOpCode op =
149
  QueuedOpCode { qoInput = ValidOpCode op
150
               , qoStatus = OP_STATUS_QUEUED
151
               , qoPriority = opSubmitPriorityToRaw . opPriority . metaParams
152
                              $ op
153
               , qoLog = []
154
               , qoResult = JSNull
155
               , qoStartTimestamp = Nothing
156
               , qoEndTimestamp = Nothing
157
               , qoExecTimestamp = Nothing
158
               }
159

    
160
-- | From a job-id and a list of op-codes create a job. This is
161
-- the pure part of job creation, as allocating a new job id
162
-- lives in IO.
163
queuedJobFromOpCodes :: (Monad m) => JobId -> [MetaOpCode] -> m QueuedJob
164
queuedJobFromOpCodes jobid ops = do
165
  ops' <- mapM (`resolveDependencies` jobid) ops
166
  return QueuedJob { qjId = jobid
167
                   , qjOps = map queuedOpCodeFromMetaOpCode ops'
168
                   , qjReceivedTimestamp = Nothing 
169
                   , qjStartTimestamp = Nothing
170
                   , qjEndTimestamp = Nothing
171
                   }
172

    
173
-- | Job file prefix.
174
jobFilePrefix :: String
175
jobFilePrefix = "job-"
176

    
177
-- | Computes the filename for a given job ID.
178
jobFileName :: JobId -> FilePath
179
jobFileName jid = jobFilePrefix ++ show (fromJobId jid)
180

    
181
-- | Parses a job ID from a file name.
182
parseJobFileId :: (Monad m) => FilePath -> m JobId
183
parseJobFileId path =
184
  case stripPrefix jobFilePrefix path of
185
    Nothing -> fail $ "Job file '" ++ path ++
186
                      "' doesn't have the correct prefix"
187
    Just suffix -> makeJobIdS suffix
188

    
189
-- | Computes the full path to a live job.
190
liveJobFile :: FilePath -> JobId -> FilePath
191
liveJobFile rootdir jid = rootdir </> jobFileName jid
192

    
193
-- | Computes the full path to an archives job. BROKEN.
194
archivedJobFile :: FilePath -> JobId -> FilePath
195
archivedJobFile rootdir jid =
196
  let subdir = show (fromJobId jid `div` C.jstoreJobsPerArchiveDirectory)
197
  in rootdir </> jobQueueArchiveSubDir </> subdir </> jobFileName jid
198

    
199
-- | Map from opcode status to job status.
200
opStatusToJob :: OpStatus -> JobStatus
201
opStatusToJob OP_STATUS_QUEUED    = JOB_STATUS_QUEUED
202
opStatusToJob OP_STATUS_WAITING   = JOB_STATUS_WAITING
203
opStatusToJob OP_STATUS_SUCCESS   = JOB_STATUS_SUCCESS
204
opStatusToJob OP_STATUS_RUNNING   = JOB_STATUS_RUNNING
205
opStatusToJob OP_STATUS_CANCELING = JOB_STATUS_CANCELING
206
opStatusToJob OP_STATUS_CANCELED  = JOB_STATUS_CANCELED
207
opStatusToJob OP_STATUS_ERROR     = JOB_STATUS_ERROR
208

    
209
-- | Computes a queued job's status.
210
calcJobStatus :: QueuedJob -> JobStatus
211
calcJobStatus QueuedJob { qjOps = ops } =
212
  extractOpSt (map qoStatus ops) JOB_STATUS_QUEUED True
213
    where
214
      terminalStatus OP_STATUS_ERROR     = True
215
      terminalStatus OP_STATUS_CANCELING = True
216
      terminalStatus OP_STATUS_CANCELED  = True
217
      terminalStatus _                   = False
218
      softStatus     OP_STATUS_SUCCESS   = True
219
      softStatus     OP_STATUS_QUEUED    = True
220
      softStatus     _                   = False
221
      extractOpSt [] _ True = JOB_STATUS_SUCCESS
222
      extractOpSt [] d False = d
223
      extractOpSt (x:xs) d old_all
224
           | terminalStatus x = opStatusToJob x -- abort recursion
225
           | softStatus x     = extractOpSt xs d new_all -- continue unchanged
226
           | otherwise        = extractOpSt xs (opStatusToJob x) new_all
227
           where new_all = x == OP_STATUS_SUCCESS && old_all
228

    
229
-- | Determine whether an opcode status is finalized.
230
opStatusFinalized :: OpStatus -> Bool
231
opStatusFinalized = (> OP_STATUS_RUNNING)
232

    
233
-- | Compute a job's priority.
234
calcJobPriority :: QueuedJob -> Int
235
calcJobPriority QueuedJob { qjOps = ops } =
236
  helper . map qoPriority $ filter (not . opStatusFinalized . qoStatus) ops
237
    where helper [] = C.opPrioDefault
238
          helper ps = minimum ps
239

    
240
-- | Log but ignore an 'IOError'.
241
ignoreIOError :: a -> Bool -> String -> IOError -> IO a
242
ignoreIOError a ignore_noent msg e = do
243
  unless (isDoesNotExistError e && ignore_noent) .
244
    logWarning $ msg ++ ": " ++ show e
245
  return a
246

    
247
-- | Compute the list of existing archive directories. Note that I/O
248
-- exceptions are swallowed and ignored.
249
allArchiveDirs :: FilePath -> IO [FilePath]
250
allArchiveDirs rootdir = do
251
  let adir = rootdir </> jobQueueArchiveSubDir
252
  contents <- getDirectoryContents adir `Control.Exception.catch`
253
               ignoreIOError [] False
254
                 ("Failed to list queue directory " ++ adir)
255
  let fpaths = map (adir </>) $ filter (not . ("." `isPrefixOf`)) contents
256
  filterM (\path ->
257
             liftM isDirectory (getFileStatus (adir </> path))
258
               `Control.Exception.catch`
259
               ignoreIOError False True
260
                 ("Failed to stat archive path " ++ path)) fpaths
261

    
262
-- | Build list of directories containing job files. Note: compared to
263
-- the Python version, this doesn't ignore a potential lost+found
264
-- file.
265
determineJobDirectories :: FilePath -> Bool -> IO [FilePath]
266
determineJobDirectories rootdir archived = do
267
  other <- if archived
268
             then allArchiveDirs rootdir
269
             else return []
270
  return $ rootdir:other
271

    
272
-- Function equivalent to the \'sequence\' function, that cannot be used because
273
-- of library version conflict on Lucid.
274
-- FIXME: delete this and just use \'sequence\' instead when Lucid compatibility
275
-- will not be required anymore.
276
sequencer :: [Either IOError [JobId]] -> Either IOError [[JobId]]
277
sequencer l = fmap reverse $ foldl seqFolder (Right []) l
278

    
279
-- | Folding function for joining multiple [JobIds] into one list.
280
seqFolder :: Either IOError [[JobId]]
281
          -> Either IOError [JobId]
282
          -> Either IOError [[JobId]]
283
seqFolder (Left e) _ = Left e
284
seqFolder (Right _) (Left e) = Left e
285
seqFolder (Right l) (Right el) = Right $ el:l
286

    
287
-- | Computes the list of all jobs in the given directories.
288
getJobIDs :: [FilePath] -> IO (Either IOError [JobId])
289
getJobIDs paths = liftM (fmap concat . sequencer) (mapM getDirJobIDs paths)
290

    
291
-- | Sorts the a list of job IDs.
292
sortJobIDs :: [JobId] -> [JobId]
293
sortJobIDs = sortBy (comparing fromJobId)
294

    
295
-- | Computes the list of jobs in a given directory.
296
getDirJobIDs :: FilePath -> IO (Either IOError [JobId])
297
getDirJobIDs path = do
298
  either_contents <-
299
    try (getDirectoryContents path) :: IO (Either IOError [FilePath])
300
  case either_contents of
301
    Left e -> do
302
      logWarning $ "Failed to list job directory " ++ path ++ ": " ++ show e
303
      return $ Left e
304
    Right contents -> do
305
      let jids = foldl (\ids file ->
306
                         case parseJobFileId file of
307
                           Nothing -> ids
308
                           Just new_id -> new_id:ids) [] contents
309
      return . Right $ reverse jids
310

    
311
-- | Reads the job data from disk.
312
readJobDataFromDisk :: FilePath -> Bool -> JobId -> IO (Maybe (String, Bool))
313
readJobDataFromDisk rootdir archived jid = do
314
  let live_path = liveJobFile rootdir jid
315
      archived_path = archivedJobFile rootdir jid
316
      all_paths = if archived
317
                    then [(live_path, False), (archived_path, True)]
318
                    else [(live_path, False)]
319
  foldM (\state (path, isarchived) ->
320
           liftM (\r -> Just (r, isarchived)) (readFile path)
321
             `Control.Exception.catch`
322
             ignoreIOError state True
323
               ("Failed to read job file " ++ path)) Nothing all_paths
324

    
325
-- | Failed to load job error.
326
noSuchJob :: Result (QueuedJob, Bool)
327
noSuchJob = Bad "Can't load job file"
328

    
329
-- | Loads a job from disk.
330
loadJobFromDisk :: FilePath -> Bool -> JobId -> IO (Result (QueuedJob, Bool))
331
loadJobFromDisk rootdir archived jid = do
332
  raw <- readJobDataFromDisk rootdir archived jid
333
  -- note: we need some stricness below, otherwise the wrapping in a
334
  -- Result will create too much lazyness, and not close the file
335
  -- descriptors for the individual jobs
336
  return $! case raw of
337
             Nothing -> noSuchJob
338
             Just (str, arch) ->
339
               liftM (\qj -> (qj, arch)) .
340
               fromJResult "Parsing job file" $ Text.JSON.decode str
341

    
342
-- | Write a job to disk.
343
writeJobToDisk :: FilePath -> QueuedJob -> IO (Result ())
344
writeJobToDisk rootdir job = do
345
  let filename = liveJobFile rootdir . qjId $ job
346
      content = Text.JSON.encode . Text.JSON.showJSON $ job
347
  tryAndLogIOError (atomicWriteFile filename content)
348
                   ("Failed to write " ++ filename) Ok
349

    
350
-- | Read the job serial number from disk.
351
readSerialFromDisk :: IO (Result JobId)
352
readSerialFromDisk = do
353
  filename <- jobQueueSerialFile
354
  tryAndLogIOError (readFile filename) "Failed to read serial file"
355
                   (makeJobIdS . rStripSpace)
356

    
357
-- | Allocate new job ids.
358
-- To avoid races while accessing the serial file, the threads synchronize
359
-- over a lock, as usual provided by an MVar.
360
allocateJobIds :: [Node] -> MVar () -> Int -> IO (Result [JobId])
361
allocateJobIds mastercandidates lock n =
362
  if n <= 0
363
    then return . Bad $ "Can only allocate positive number of job ids"
364
    else do
365
      takeMVar lock
366
      rjobid <- readSerialFromDisk
367
      case rjobid of
368
        Bad s -> do
369
          putMVar lock ()
370
          return . Bad $ s
371
        Ok jid -> do
372
          let current = fromJobId jid
373
              serial_content = show (current + n) ++  "\n"
374
          serial <- jobQueueSerialFile
375
          write_result <- try $ atomicWriteFile serial serial_content
376
                          :: IO (Either IOError ())
377
          case write_result of
378
            Left e -> do
379
              putMVar lock ()
380
              let msg = "Failed to write serial file: " ++ show e
381
              logError msg
382
              return . Bad $ msg 
383
            Right () -> do
384
              _ <- executeRpcCall mastercandidates
385
                     $ RpcCallJobqueueUpdate serial serial_content
386
              putMVar lock ()
387
              return $ mapM makeJobId [(current+1)..(current+n)]
388

    
389
-- | Allocate one new job id.
390
allocateJobId :: [Node] -> MVar () -> IO (Result JobId)
391
allocateJobId mastercandidates lock = do
392
  jids <- allocateJobIds mastercandidates lock 1
393
  return (jids >>= monadicThe "Failed to allocate precisely one Job ID")
394

    
395
-- | Decide if job queue is open
396
isQueueOpen :: IO Bool
397
isQueueOpen = liftM not (jobQueueDrainFile >>= doesFileExist)