Statistics
| Branch: | Tag: | Revision:

root / src / Ganeti / JQueue.hs @ 9fd653a4

History | View | Annotate | Download (15 kB)

1
{-# LANGUAGE TemplateHaskell #-}
2

    
3
{-| Implementation of the job queue.
4

    
5
-}
6

    
7
{-
8

    
9
Copyright (C) 2010, 2012 Google Inc.
10

    
11
This program is free software; you can redistribute it and/or modify
12
it under the terms of the GNU General Public License as published by
13
the Free Software Foundation; either version 2 of the License, or
14
(at your option) any later version.
15

    
16
This program is distributed in the hope that it will be useful, but
17
WITHOUT ANY WARRANTY; without even the implied warranty of
18
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19
General Public License for more details.
20

    
21
You should have received a copy of the GNU General Public License
22
along with this program; if not, write to the Free Software
23
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24
02110-1301, USA.
25

    
26
-}
27

    
28
module Ganeti.JQueue
29
    ( QueuedOpCode(..)
30
    , QueuedJob(..)
31
    , InputOpCode(..)
32
    , queuedOpCodeFromMetaOpCode
33
    , queuedJobFromOpCodes
34
    , Timestamp
35
    , noTimestamp
36
    , opStatusFinalized
37
    , extractOpSummary
38
    , calcJobStatus
39
    , calcJobPriority
40
    , jobFileName
41
    , liveJobFile
42
    , archivedJobFile
43
    , determineJobDirectories
44
    , getJobIDs
45
    , sortJobIDs
46
    , loadJobFromDisk
47
    , noSuchJob
48
    , readSerialFromDisk
49
    , allocateJobIds
50
    , allocateJobId
51
    , writeJobToDisk
52
    , replicateManyJobs
53
    , isQueueOpen
54
    ) where
55

    
56
import Control.Concurrent.MVar
57
import Control.Exception
58
import Control.Monad
59
import Data.List
60
import Data.Maybe
61
import Data.Ord (comparing)
62
-- workaround what seems to be a bug in ghc 7.4's TH shadowing code
63
import Prelude hiding (log, id)
64
import System.Directory
65
import System.FilePath
66
import System.IO.Error (isDoesNotExistError)
67
import System.Posix.Files
68
import qualified Text.JSON
69
import Text.JSON.Types
70

    
71
import Ganeti.BasicTypes
72
import qualified Ganeti.Constants as C
73
import Ganeti.JSON
74
import Ganeti.Logging
75
import Ganeti.Objects (Node)
76
import Ganeti.OpCodes
77
import Ganeti.Path
78
import Ganeti.Rpc (executeRpcCall, ERpcError, logRpcErrors,
79
                   RpcCallJobqueueUpdate(..))
80
import Ganeti.THH
81
import Ganeti.Types
82
import Ganeti.Utils
83

    
84
-- * Data types
85

    
86
-- | The ganeti queue timestamp type
87
type Timestamp = (Int, Int)
88

    
89
-- | Missing timestamp type.
90
noTimestamp :: Timestamp
91
noTimestamp = (-1, -1)
92

    
93
-- | An input opcode.
94
data InputOpCode = ValidOpCode MetaOpCode -- ^ OpCode was parsed successfully
95
                 | InvalidOpCode JSValue  -- ^ Invalid opcode
96
                   deriving (Show, Eq)
97

    
98
-- | JSON instance for 'InputOpCode', trying to parse it and if
99
-- failing, keeping the original JSValue.
100
instance Text.JSON.JSON InputOpCode where
101
  showJSON (ValidOpCode mo) = Text.JSON.showJSON mo
102
  showJSON (InvalidOpCode inv) = inv
103
  readJSON v = case Text.JSON.readJSON v of
104
                 Text.JSON.Error _ -> return $ InvalidOpCode v
105
                 Text.JSON.Ok mo -> return $ ValidOpCode mo
106

    
107
-- | Invalid opcode summary.
108
invalidOp :: String
109
invalidOp = "INVALID_OP"
110

    
111
-- | Tries to extract the opcode summary from an 'InputOpCode'. This
112
-- duplicates some functionality from the 'opSummary' function in
113
-- "Ganeti.OpCodes".
114
extractOpSummary :: InputOpCode -> String
115
extractOpSummary (ValidOpCode metaop) = opSummary $ metaOpCode metaop
116
extractOpSummary (InvalidOpCode (JSObject o)) =
117
  case fromObjWithDefault (fromJSObject o) "OP_ID" ("OP_" ++ invalidOp) of
118
    Just s -> drop 3 s -- drop the OP_ prefix
119
    Nothing -> invalidOp
120
extractOpSummary _ = invalidOp
121

    
122
$(buildObject "QueuedOpCode" "qo"
123
  [ simpleField "input"           [t| InputOpCode |]
124
  , simpleField "status"          [t| OpStatus    |]
125
  , simpleField "result"          [t| JSValue     |]
126
  , defaultField [| [] |] $
127
    simpleField "log"             [t| [(Int, Timestamp, ELogType, JSValue)] |]
128
  , simpleField "priority"        [t| Int         |]
129
  , optionalNullSerField $
130
    simpleField "start_timestamp" [t| Timestamp   |]
131
  , optionalNullSerField $
132
    simpleField "exec_timestamp"  [t| Timestamp   |]
133
  , optionalNullSerField $
134
    simpleField "end_timestamp"   [t| Timestamp   |]
135
  ])
136

    
137
$(buildObject "QueuedJob" "qj"
138
  [ simpleField "id"                 [t| JobId          |]
139
  , simpleField "ops"                [t| [QueuedOpCode] |]
140
  , optionalNullSerField $
141
    simpleField "received_timestamp" [t| Timestamp      |]
142
  , optionalNullSerField $
143
    simpleField "start_timestamp"    [t| Timestamp      |]
144
  , optionalNullSerField $
145
    simpleField "end_timestamp"      [t| Timestamp      |]
146
  ])
147

    
148
-- | Convenience function to obtain a QueuedOpCode from a MetaOpCode
149
queuedOpCodeFromMetaOpCode :: MetaOpCode -> QueuedOpCode
150
queuedOpCodeFromMetaOpCode op =
151
  QueuedOpCode { qoInput = ValidOpCode op
152
               , qoStatus = OP_STATUS_QUEUED
153
               , qoPriority = opSubmitPriorityToRaw . opPriority . metaParams
154
                              $ op
155
               , qoLog = []
156
               , qoResult = JSNull
157
               , qoStartTimestamp = Nothing
158
               , qoEndTimestamp = Nothing
159
               , qoExecTimestamp = Nothing
160
               }
161

    
162
-- | From a job-id and a list of op-codes create a job. This is
163
-- the pure part of job creation, as allocating a new job id
164
-- lives in IO.
165
queuedJobFromOpCodes :: (Monad m) => JobId -> [MetaOpCode] -> m QueuedJob
166
queuedJobFromOpCodes jobid ops = do
167
  ops' <- mapM (`resolveDependencies` jobid) ops
168
  return QueuedJob { qjId = jobid
169
                   , qjOps = map queuedOpCodeFromMetaOpCode ops'
170
                   , qjReceivedTimestamp = Nothing 
171
                   , qjStartTimestamp = Nothing
172
                   , qjEndTimestamp = Nothing
173
                   }
174

    
175
-- | Job file prefix.
176
jobFilePrefix :: String
177
jobFilePrefix = "job-"
178

    
179
-- | Computes the filename for a given job ID.
180
jobFileName :: JobId -> FilePath
181
jobFileName jid = jobFilePrefix ++ show (fromJobId jid)
182

    
183
-- | Parses a job ID from a file name.
184
parseJobFileId :: (Monad m) => FilePath -> m JobId
185
parseJobFileId path =
186
  case stripPrefix jobFilePrefix path of
187
    Nothing -> fail $ "Job file '" ++ path ++
188
                      "' doesn't have the correct prefix"
189
    Just suffix -> makeJobIdS suffix
190

    
191
-- | Computes the full path to a live job.
192
liveJobFile :: FilePath -> JobId -> FilePath
193
liveJobFile rootdir jid = rootdir </> jobFileName jid
194

    
195
-- | Computes the full path to an archives job. BROKEN.
196
archivedJobFile :: FilePath -> JobId -> FilePath
197
archivedJobFile rootdir jid =
198
  let subdir = show (fromJobId jid `div` C.jstoreJobsPerArchiveDirectory)
199
  in rootdir </> jobQueueArchiveSubDir </> subdir </> jobFileName jid
200

    
201
-- | Map from opcode status to job status.
202
opStatusToJob :: OpStatus -> JobStatus
203
opStatusToJob OP_STATUS_QUEUED    = JOB_STATUS_QUEUED
204
opStatusToJob OP_STATUS_WAITING   = JOB_STATUS_WAITING
205
opStatusToJob OP_STATUS_SUCCESS   = JOB_STATUS_SUCCESS
206
opStatusToJob OP_STATUS_RUNNING   = JOB_STATUS_RUNNING
207
opStatusToJob OP_STATUS_CANCELING = JOB_STATUS_CANCELING
208
opStatusToJob OP_STATUS_CANCELED  = JOB_STATUS_CANCELED
209
opStatusToJob OP_STATUS_ERROR     = JOB_STATUS_ERROR
210

    
211
-- | Computes a queued job's status.
212
calcJobStatus :: QueuedJob -> JobStatus
213
calcJobStatus QueuedJob { qjOps = ops } =
214
  extractOpSt (map qoStatus ops) JOB_STATUS_QUEUED True
215
    where
216
      terminalStatus OP_STATUS_ERROR     = True
217
      terminalStatus OP_STATUS_CANCELING = True
218
      terminalStatus OP_STATUS_CANCELED  = True
219
      terminalStatus _                   = False
220
      softStatus     OP_STATUS_SUCCESS   = True
221
      softStatus     OP_STATUS_QUEUED    = True
222
      softStatus     _                   = False
223
      extractOpSt [] _ True = JOB_STATUS_SUCCESS
224
      extractOpSt [] d False = d
225
      extractOpSt (x:xs) d old_all
226
           | terminalStatus x = opStatusToJob x -- abort recursion
227
           | softStatus x     = extractOpSt xs d new_all -- continue unchanged
228
           | otherwise        = extractOpSt xs (opStatusToJob x) new_all
229
           where new_all = x == OP_STATUS_SUCCESS && old_all
230

    
231
-- | Determine whether an opcode status is finalized.
232
opStatusFinalized :: OpStatus -> Bool
233
opStatusFinalized = (> OP_STATUS_RUNNING)
234

    
235
-- | Compute a job's priority.
236
calcJobPriority :: QueuedJob -> Int
237
calcJobPriority QueuedJob { qjOps = ops } =
238
  helper . map qoPriority $ filter (not . opStatusFinalized . qoStatus) ops
239
    where helper [] = C.opPrioDefault
240
          helper ps = minimum ps
241

    
242
-- | Log but ignore an 'IOError'.
243
ignoreIOError :: a -> Bool -> String -> IOError -> IO a
244
ignoreIOError a ignore_noent msg e = do
245
  unless (isDoesNotExistError e && ignore_noent) .
246
    logWarning $ msg ++ ": " ++ show e
247
  return a
248

    
249
-- | Compute the list of existing archive directories. Note that I/O
250
-- exceptions are swallowed and ignored.
251
allArchiveDirs :: FilePath -> IO [FilePath]
252
allArchiveDirs rootdir = do
253
  let adir = rootdir </> jobQueueArchiveSubDir
254
  contents <- getDirectoryContents adir `Control.Exception.catch`
255
               ignoreIOError [] False
256
                 ("Failed to list queue directory " ++ adir)
257
  let fpaths = map (adir </>) $ filter (not . ("." `isPrefixOf`)) contents
258
  filterM (\path ->
259
             liftM isDirectory (getFileStatus (adir </> path))
260
               `Control.Exception.catch`
261
               ignoreIOError False True
262
                 ("Failed to stat archive path " ++ path)) fpaths
263

    
264
-- | Build list of directories containing job files. Note: compared to
265
-- the Python version, this doesn't ignore a potential lost+found
266
-- file.
267
determineJobDirectories :: FilePath -> Bool -> IO [FilePath]
268
determineJobDirectories rootdir archived = do
269
  other <- if archived
270
             then allArchiveDirs rootdir
271
             else return []
272
  return $ rootdir:other
273

    
274
-- Function equivalent to the \'sequence\' function, that cannot be used because
275
-- of library version conflict on Lucid.
276
-- FIXME: delete this and just use \'sequence\' instead when Lucid compatibility
277
-- will not be required anymore.
278
sequencer :: [Either IOError [JobId]] -> Either IOError [[JobId]]
279
sequencer l = fmap reverse $ foldl seqFolder (Right []) l
280

    
281
-- | Folding function for joining multiple [JobIds] into one list.
282
seqFolder :: Either IOError [[JobId]]
283
          -> Either IOError [JobId]
284
          -> Either IOError [[JobId]]
285
seqFolder (Left e) _ = Left e
286
seqFolder (Right _) (Left e) = Left e
287
seqFolder (Right l) (Right el) = Right $ el:l
288

    
289
-- | Computes the list of all jobs in the given directories.
290
getJobIDs :: [FilePath] -> IO (Either IOError [JobId])
291
getJobIDs paths = liftM (fmap concat . sequencer) (mapM getDirJobIDs paths)
292

    
293
-- | Sorts the a list of job IDs.
294
sortJobIDs :: [JobId] -> [JobId]
295
sortJobIDs = sortBy (comparing fromJobId)
296

    
297
-- | Computes the list of jobs in a given directory.
298
getDirJobIDs :: FilePath -> IO (Either IOError [JobId])
299
getDirJobIDs path = do
300
  either_contents <-
301
    try (getDirectoryContents path) :: IO (Either IOError [FilePath])
302
  case either_contents of
303
    Left e -> do
304
      logWarning $ "Failed to list job directory " ++ path ++ ": " ++ show e
305
      return $ Left e
306
    Right contents -> do
307
      let jids = foldl (\ids file ->
308
                         case parseJobFileId file of
309
                           Nothing -> ids
310
                           Just new_id -> new_id:ids) [] contents
311
      return . Right $ reverse jids
312

    
313
-- | Reads the job data from disk.
314
readJobDataFromDisk :: FilePath -> Bool -> JobId -> IO (Maybe (String, Bool))
315
readJobDataFromDisk rootdir archived jid = do
316
  let live_path = liveJobFile rootdir jid
317
      archived_path = archivedJobFile rootdir jid
318
      all_paths = if archived
319
                    then [(live_path, False), (archived_path, True)]
320
                    else [(live_path, False)]
321
  foldM (\state (path, isarchived) ->
322
           liftM (\r -> Just (r, isarchived)) (readFile path)
323
             `Control.Exception.catch`
324
             ignoreIOError state True
325
               ("Failed to read job file " ++ path)) Nothing all_paths
326

    
327
-- | Failed to load job error.
328
noSuchJob :: Result (QueuedJob, Bool)
329
noSuchJob = Bad "Can't load job file"
330

    
331
-- | Loads a job from disk.
332
loadJobFromDisk :: FilePath -> Bool -> JobId -> IO (Result (QueuedJob, Bool))
333
loadJobFromDisk rootdir archived jid = do
334
  raw <- readJobDataFromDisk rootdir archived jid
335
  -- note: we need some stricness below, otherwise the wrapping in a
336
  -- Result will create too much lazyness, and not close the file
337
  -- descriptors for the individual jobs
338
  return $! case raw of
339
             Nothing -> noSuchJob
340
             Just (str, arch) ->
341
               liftM (\qj -> (qj, arch)) .
342
               fromJResult "Parsing job file" $ Text.JSON.decode str
343

    
344
-- | Write a job to disk.
345
writeJobToDisk :: FilePath -> QueuedJob -> IO (Result ())
346
writeJobToDisk rootdir job = do
347
  let filename = liveJobFile rootdir . qjId $ job
348
      content = Text.JSON.encode . Text.JSON.showJSON $ job
349
  tryAndLogIOError (atomicWriteFile filename content)
350
                   ("Failed to write " ++ filename) Ok
351

    
352
-- | Replicate a job to all master candidates.
353
replicateJob :: FilePath -> [Node] -> QueuedJob -> IO [(Node, ERpcError ())]
354
replicateJob rootdir mastercandidates job = do
355
  let filename = liveJobFile rootdir . qjId $ job
356
      content = Text.JSON.encode . Text.JSON.showJSON $ job
357
  result <- executeRpcCall mastercandidates
358
              $ RpcCallJobqueueUpdate filename content
359
  logRpcErrors result
360
  return result
361

    
362
-- | Replicate many jobs to all master candidates.
363
replicateManyJobs :: FilePath -> [Node] -> [QueuedJob] -> IO ()
364
replicateManyJobs rootdir mastercandidates =
365
  mapM_ (replicateJob rootdir mastercandidates)
366

    
367
-- | Read the job serial number from disk.
368
readSerialFromDisk :: IO (Result JobId)
369
readSerialFromDisk = do
370
  filename <- jobQueueSerialFile
371
  tryAndLogIOError (readFile filename) "Failed to read serial file"
372
                   (makeJobIdS . rStripSpace)
373

    
374
-- | Allocate new job ids.
375
-- To avoid races while accessing the serial file, the threads synchronize
376
-- over a lock, as usual provided by an MVar.
377
allocateJobIds :: [Node] -> MVar () -> Int -> IO (Result [JobId])
378
allocateJobIds mastercandidates lock n =
379
  if n <= 0
380
    then return . Bad $ "Can only allocate positive number of job ids"
381
    else do
382
      takeMVar lock
383
      rjobid <- readSerialFromDisk
384
      case rjobid of
385
        Bad s -> do
386
          putMVar lock ()
387
          return . Bad $ s
388
        Ok jid -> do
389
          let current = fromJobId jid
390
              serial_content = show (current + n) ++  "\n"
391
          serial <- jobQueueSerialFile
392
          write_result <- try $ atomicWriteFile serial serial_content
393
                          :: IO (Either IOError ())
394
          case write_result of
395
            Left e -> do
396
              putMVar lock ()
397
              let msg = "Failed to write serial file: " ++ show e
398
              logError msg
399
              return . Bad $ msg 
400
            Right () -> do
401
              _ <- executeRpcCall mastercandidates
402
                     $ RpcCallJobqueueUpdate serial serial_content
403
              putMVar lock ()
404
              return $ mapM makeJobId [(current+1)..(current+n)]
405

    
406
-- | Allocate one new job id.
407
allocateJobId :: [Node] -> MVar () -> IO (Result JobId)
408
allocateJobId mastercandidates lock = do
409
  jids <- allocateJobIds mastercandidates lock 1
410
  return (jids >>= monadicThe "Failed to allocate precisely one Job ID")
411

    
412
-- | Decide if job queue is open
413
isQueueOpen :: IO Bool
414
isQueueOpen = liftM not (jobQueueDrainFile >>= doesFileExist)