Statistics
| Branch: | Tag: | Revision:

root / src / Ganeti / JQueue.hs @ 493d6920

History | View | Annotate | Download (15.5 kB)

1
{-# LANGUAGE TemplateHaskell #-}
2

    
3
{-| Implementation of the job queue.
4

    
5
-}
6

    
7
{-
8

    
9
Copyright (C) 2010, 2012 Google Inc.
10

    
11
This program is free software; you can redistribute it and/or modify
12
it under the terms of the GNU General Public License as published by
13
the Free Software Foundation; either version 2 of the License, or
14
(at your option) any later version.
15

    
16
This program is distributed in the hope that it will be useful, but
17
WITHOUT ANY WARRANTY; without even the implied warranty of
18
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19
General Public License for more details.
20

    
21
You should have received a copy of the GNU General Public License
22
along with this program; if not, write to the Free Software
23
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24
02110-1301, USA.
25

    
26
-}
27

    
28
module Ganeti.JQueue
29
    ( QueuedOpCode(..)
30
    , QueuedJob(..)
31
    , InputOpCode(..)
32
    , queuedOpCodeFromMetaOpCode
33
    , queuedJobFromOpCodes
34
    , Timestamp
35
    , noTimestamp
36
    , opStatusFinalized
37
    , extractOpSummary
38
    , calcJobStatus
39
    , calcJobPriority
40
    , jobFileName
41
    , liveJobFile
42
    , archivedJobFile
43
    , determineJobDirectories
44
    , getJobIDs
45
    , sortJobIDs
46
    , loadJobFromDisk
47
    , noSuchJob
48
    , readSerialFromDisk
49
    , allocateJobIds
50
    , allocateJobId
51
    , writeJobToDisk
52
    , replicateManyJobs
53
    , isQueueOpen
54
    , enqueueJobs
55
    ) where
56

    
57
import Control.Concurrent.MVar
58
import Control.Exception
59
import Control.Monad
60
import Data.List
61
import Data.Maybe
62
import Data.Ord (comparing)
63
-- workaround what seems to be a bug in ghc 7.4's TH shadowing code
64
import Prelude hiding (log, id)
65
import System.Directory
66
import System.FilePath
67
import System.IO.Error (isDoesNotExistError)
68
import System.Posix.Files
69
import qualified Text.JSON
70
import Text.JSON.Types
71

    
72
import Ganeti.BasicTypes
73
import qualified Ganeti.Constants as C
74
import Ganeti.JSON
75
import Ganeti.Logging
76
import Ganeti.Luxi
77
import Ganeti.Objects (Node)
78
import Ganeti.OpCodes
79
import Ganeti.Path
80
import Ganeti.Rpc (executeRpcCall, ERpcError, logRpcErrors,
81
                   RpcCallJobqueueUpdate(..))
82
import Ganeti.THH
83
import Ganeti.Types
84
import Ganeti.Utils
85

    
86
-- * Data types
87

    
88
-- | The ganeti queue timestamp type
89
type Timestamp = (Int, Int)
90

    
91
-- | Missing timestamp type.
92
noTimestamp :: Timestamp
93
noTimestamp = (-1, -1)
94

    
95
-- | An input opcode.
96
data InputOpCode = ValidOpCode MetaOpCode -- ^ OpCode was parsed successfully
97
                 | InvalidOpCode JSValue  -- ^ Invalid opcode
98
                   deriving (Show, Eq)
99

    
100
-- | JSON instance for 'InputOpCode', trying to parse it and if
101
-- failing, keeping the original JSValue.
102
instance Text.JSON.JSON InputOpCode where
103
  showJSON (ValidOpCode mo) = Text.JSON.showJSON mo
104
  showJSON (InvalidOpCode inv) = inv
105
  readJSON v = case Text.JSON.readJSON v of
106
                 Text.JSON.Error _ -> return $ InvalidOpCode v
107
                 Text.JSON.Ok mo -> return $ ValidOpCode mo
108

    
109
-- | Invalid opcode summary.
110
invalidOp :: String
111
invalidOp = "INVALID_OP"
112

    
113
-- | Tries to extract the opcode summary from an 'InputOpCode'. This
114
-- duplicates some functionality from the 'opSummary' function in
115
-- "Ganeti.OpCodes".
116
extractOpSummary :: InputOpCode -> String
117
extractOpSummary (ValidOpCode metaop) = opSummary $ metaOpCode metaop
118
extractOpSummary (InvalidOpCode (JSObject o)) =
119
  case fromObjWithDefault (fromJSObject o) "OP_ID" ("OP_" ++ invalidOp) of
120
    Just s -> drop 3 s -- drop the OP_ prefix
121
    Nothing -> invalidOp
122
extractOpSummary _ = invalidOp
123

    
124
$(buildObject "QueuedOpCode" "qo"
125
  [ simpleField "input"           [t| InputOpCode |]
126
  , simpleField "status"          [t| OpStatus    |]
127
  , simpleField "result"          [t| JSValue     |]
128
  , defaultField [| [] |] $
129
    simpleField "log"             [t| [(Int, Timestamp, ELogType, JSValue)] |]
130
  , simpleField "priority"        [t| Int         |]
131
  , optionalNullSerField $
132
    simpleField "start_timestamp" [t| Timestamp   |]
133
  , optionalNullSerField $
134
    simpleField "exec_timestamp"  [t| Timestamp   |]
135
  , optionalNullSerField $
136
    simpleField "end_timestamp"   [t| Timestamp   |]
137
  ])
138

    
139
$(buildObject "QueuedJob" "qj"
140
  [ simpleField "id"                 [t| JobId          |]
141
  , simpleField "ops"                [t| [QueuedOpCode] |]
142
  , optionalNullSerField $
143
    simpleField "received_timestamp" [t| Timestamp      |]
144
  , optionalNullSerField $
145
    simpleField "start_timestamp"    [t| Timestamp      |]
146
  , optionalNullSerField $
147
    simpleField "end_timestamp"      [t| Timestamp      |]
148
  ])
149

    
150
-- | Convenience function to obtain a QueuedOpCode from a MetaOpCode
151
queuedOpCodeFromMetaOpCode :: MetaOpCode -> QueuedOpCode
152
queuedOpCodeFromMetaOpCode op =
153
  QueuedOpCode { qoInput = ValidOpCode op
154
               , qoStatus = OP_STATUS_QUEUED
155
               , qoPriority = opSubmitPriorityToRaw . opPriority . metaParams
156
                              $ op
157
               , qoLog = []
158
               , qoResult = JSNull
159
               , qoStartTimestamp = Nothing
160
               , qoEndTimestamp = Nothing
161
               , qoExecTimestamp = Nothing
162
               }
163

    
164
-- | From a job-id and a list of op-codes create a job. This is
165
-- the pure part of job creation, as allocating a new job id
166
-- lives in IO.
167
queuedJobFromOpCodes :: (Monad m) => JobId -> [MetaOpCode] -> m QueuedJob
168
queuedJobFromOpCodes jobid ops = do
169
  ops' <- mapM (`resolveDependencies` jobid) ops
170
  return QueuedJob { qjId = jobid
171
                   , qjOps = map queuedOpCodeFromMetaOpCode ops'
172
                   , qjReceivedTimestamp = Nothing 
173
                   , qjStartTimestamp = Nothing
174
                   , qjEndTimestamp = Nothing
175
                   }
176

    
177
-- | Job file prefix.
178
jobFilePrefix :: String
179
jobFilePrefix = "job-"
180

    
181
-- | Computes the filename for a given job ID.
182
jobFileName :: JobId -> FilePath
183
jobFileName jid = jobFilePrefix ++ show (fromJobId jid)
184

    
185
-- | Parses a job ID from a file name.
186
parseJobFileId :: (Monad m) => FilePath -> m JobId
187
parseJobFileId path =
188
  case stripPrefix jobFilePrefix path of
189
    Nothing -> fail $ "Job file '" ++ path ++
190
                      "' doesn't have the correct prefix"
191
    Just suffix -> makeJobIdS suffix
192

    
193
-- | Computes the full path to a live job.
194
liveJobFile :: FilePath -> JobId -> FilePath
195
liveJobFile rootdir jid = rootdir </> jobFileName jid
196

    
197
-- | Computes the full path to an archives job. BROKEN.
198
archivedJobFile :: FilePath -> JobId -> FilePath
199
archivedJobFile rootdir jid =
200
  let subdir = show (fromJobId jid `div` C.jstoreJobsPerArchiveDirectory)
201
  in rootdir </> jobQueueArchiveSubDir </> subdir </> jobFileName jid
202

    
203
-- | Map from opcode status to job status.
204
opStatusToJob :: OpStatus -> JobStatus
205
opStatusToJob OP_STATUS_QUEUED    = JOB_STATUS_QUEUED
206
opStatusToJob OP_STATUS_WAITING   = JOB_STATUS_WAITING
207
opStatusToJob OP_STATUS_SUCCESS   = JOB_STATUS_SUCCESS
208
opStatusToJob OP_STATUS_RUNNING   = JOB_STATUS_RUNNING
209
opStatusToJob OP_STATUS_CANCELING = JOB_STATUS_CANCELING
210
opStatusToJob OP_STATUS_CANCELED  = JOB_STATUS_CANCELED
211
opStatusToJob OP_STATUS_ERROR     = JOB_STATUS_ERROR
212

    
213
-- | Computes a queued job's status.
214
calcJobStatus :: QueuedJob -> JobStatus
215
calcJobStatus QueuedJob { qjOps = ops } =
216
  extractOpSt (map qoStatus ops) JOB_STATUS_QUEUED True
217
    where
218
      terminalStatus OP_STATUS_ERROR     = True
219
      terminalStatus OP_STATUS_CANCELING = True
220
      terminalStatus OP_STATUS_CANCELED  = True
221
      terminalStatus _                   = False
222
      softStatus     OP_STATUS_SUCCESS   = True
223
      softStatus     OP_STATUS_QUEUED    = True
224
      softStatus     _                   = False
225
      extractOpSt [] _ True = JOB_STATUS_SUCCESS
226
      extractOpSt [] d False = d
227
      extractOpSt (x:xs) d old_all
228
           | terminalStatus x = opStatusToJob x -- abort recursion
229
           | softStatus x     = extractOpSt xs d new_all -- continue unchanged
230
           | otherwise        = extractOpSt xs (opStatusToJob x) new_all
231
           where new_all = x == OP_STATUS_SUCCESS && old_all
232

    
233
-- | Determine whether an opcode status is finalized.
234
opStatusFinalized :: OpStatus -> Bool
235
opStatusFinalized = (> OP_STATUS_RUNNING)
236

    
237
-- | Compute a job's priority.
238
calcJobPriority :: QueuedJob -> Int
239
calcJobPriority QueuedJob { qjOps = ops } =
240
  helper . map qoPriority $ filter (not . opStatusFinalized . qoStatus) ops
241
    where helper [] = C.opPrioDefault
242
          helper ps = minimum ps
243

    
244
-- | Log but ignore an 'IOError'.
245
ignoreIOError :: a -> Bool -> String -> IOError -> IO a
246
ignoreIOError a ignore_noent msg e = do
247
  unless (isDoesNotExistError e && ignore_noent) .
248
    logWarning $ msg ++ ": " ++ show e
249
  return a
250

    
251
-- | Compute the list of existing archive directories. Note that I/O
252
-- exceptions are swallowed and ignored.
253
allArchiveDirs :: FilePath -> IO [FilePath]
254
allArchiveDirs rootdir = do
255
  let adir = rootdir </> jobQueueArchiveSubDir
256
  contents <- getDirectoryContents adir `Control.Exception.catch`
257
               ignoreIOError [] False
258
                 ("Failed to list queue directory " ++ adir)
259
  let fpaths = map (adir </>) $ filter (not . ("." `isPrefixOf`)) contents
260
  filterM (\path ->
261
             liftM isDirectory (getFileStatus (adir </> path))
262
               `Control.Exception.catch`
263
               ignoreIOError False True
264
                 ("Failed to stat archive path " ++ path)) fpaths
265

    
266
-- | Build list of directories containing job files. Note: compared to
267
-- the Python version, this doesn't ignore a potential lost+found
268
-- file.
269
determineJobDirectories :: FilePath -> Bool -> IO [FilePath]
270
determineJobDirectories rootdir archived = do
271
  other <- if archived
272
             then allArchiveDirs rootdir
273
             else return []
274
  return $ rootdir:other
275

    
276
-- Function equivalent to the \'sequence\' function, that cannot be used because
277
-- of library version conflict on Lucid.
278
-- FIXME: delete this and just use \'sequence\' instead when Lucid compatibility
279
-- will not be required anymore.
280
sequencer :: [Either IOError [JobId]] -> Either IOError [[JobId]]
281
sequencer l = fmap reverse $ foldl seqFolder (Right []) l
282

    
283
-- | Folding function for joining multiple [JobIds] into one list.
284
seqFolder :: Either IOError [[JobId]]
285
          -> Either IOError [JobId]
286
          -> Either IOError [[JobId]]
287
seqFolder (Left e) _ = Left e
288
seqFolder (Right _) (Left e) = Left e
289
seqFolder (Right l) (Right el) = Right $ el:l
290

    
291
-- | Computes the list of all jobs in the given directories.
292
getJobIDs :: [FilePath] -> IO (Either IOError [JobId])
293
getJobIDs paths = liftM (fmap concat . sequencer) (mapM getDirJobIDs paths)
294

    
295
-- | Sorts the a list of job IDs.
296
sortJobIDs :: [JobId] -> [JobId]
297
sortJobIDs = sortBy (comparing fromJobId)
298

    
299
-- | Computes the list of jobs in a given directory.
300
getDirJobIDs :: FilePath -> IO (Either IOError [JobId])
301
getDirJobIDs path = do
302
  either_contents <-
303
    try (getDirectoryContents path) :: IO (Either IOError [FilePath])
304
  case either_contents of
305
    Left e -> do
306
      logWarning $ "Failed to list job directory " ++ path ++ ": " ++ show e
307
      return $ Left e
308
    Right contents -> do
309
      let jids = foldl (\ids file ->
310
                         case parseJobFileId file of
311
                           Nothing -> ids
312
                           Just new_id -> new_id:ids) [] contents
313
      return . Right $ reverse jids
314

    
315
-- | Reads the job data from disk.
316
readJobDataFromDisk :: FilePath -> Bool -> JobId -> IO (Maybe (String, Bool))
317
readJobDataFromDisk rootdir archived jid = do
318
  let live_path = liveJobFile rootdir jid
319
      archived_path = archivedJobFile rootdir jid
320
      all_paths = if archived
321
                    then [(live_path, False), (archived_path, True)]
322
                    else [(live_path, False)]
323
  foldM (\state (path, isarchived) ->
324
           liftM (\r -> Just (r, isarchived)) (readFile path)
325
             `Control.Exception.catch`
326
             ignoreIOError state True
327
               ("Failed to read job file " ++ path)) Nothing all_paths
328

    
329
-- | Failed to load job error.
330
noSuchJob :: Result (QueuedJob, Bool)
331
noSuchJob = Bad "Can't load job file"
332

    
333
-- | Loads a job from disk.
334
loadJobFromDisk :: FilePath -> Bool -> JobId -> IO (Result (QueuedJob, Bool))
335
loadJobFromDisk rootdir archived jid = do
336
  raw <- readJobDataFromDisk rootdir archived jid
337
  -- note: we need some stricness below, otherwise the wrapping in a
338
  -- Result will create too much lazyness, and not close the file
339
  -- descriptors for the individual jobs
340
  return $! case raw of
341
             Nothing -> noSuchJob
342
             Just (str, arch) ->
343
               liftM (\qj -> (qj, arch)) .
344
               fromJResult "Parsing job file" $ Text.JSON.decode str
345

    
346
-- | Write a job to disk.
347
writeJobToDisk :: FilePath -> QueuedJob -> IO (Result ())
348
writeJobToDisk rootdir job = do
349
  let filename = liveJobFile rootdir . qjId $ job
350
      content = Text.JSON.encode . Text.JSON.showJSON $ job
351
  tryAndLogIOError (atomicWriteFile filename content)
352
                   ("Failed to write " ++ filename) Ok
353

    
354
-- | Replicate a job to all master candidates.
355
replicateJob :: FilePath -> [Node] -> QueuedJob -> IO [(Node, ERpcError ())]
356
replicateJob rootdir mastercandidates job = do
357
  let filename = liveJobFile rootdir . qjId $ job
358
      content = Text.JSON.encode . Text.JSON.showJSON $ job
359
  result <- executeRpcCall mastercandidates
360
              $ RpcCallJobqueueUpdate filename content
361
  logRpcErrors result
362
  return result
363

    
364
-- | Replicate many jobs to all master candidates.
365
replicateManyJobs :: FilePath -> [Node] -> [QueuedJob] -> IO ()
366
replicateManyJobs rootdir mastercandidates =
367
  mapM_ (replicateJob rootdir mastercandidates)
368

    
369
-- | Read the job serial number from disk.
370
readSerialFromDisk :: IO (Result JobId)
371
readSerialFromDisk = do
372
  filename <- jobQueueSerialFile
373
  tryAndLogIOError (readFile filename) "Failed to read serial file"
374
                   (makeJobIdS . rStripSpace)
375

    
376
-- | Allocate new job ids.
377
-- To avoid races while accessing the serial file, the threads synchronize
378
-- over a lock, as usual provided by an MVar.
379
allocateJobIds :: [Node] -> MVar () -> Int -> IO (Result [JobId])
380
allocateJobIds mastercandidates lock n =
381
  if n <= 0
382
    then return . Bad $ "Can only allocate positive number of job ids"
383
    else do
384
      takeMVar lock
385
      rjobid <- readSerialFromDisk
386
      case rjobid of
387
        Bad s -> do
388
          putMVar lock ()
389
          return . Bad $ s
390
        Ok jid -> do
391
          let current = fromJobId jid
392
              serial_content = show (current + n) ++  "\n"
393
          serial <- jobQueueSerialFile
394
          write_result <- try $ atomicWriteFile serial serial_content
395
                          :: IO (Either IOError ())
396
          case write_result of
397
            Left e -> do
398
              putMVar lock ()
399
              let msg = "Failed to write serial file: " ++ show e
400
              logError msg
401
              return . Bad $ msg 
402
            Right () -> do
403
              _ <- executeRpcCall mastercandidates
404
                     $ RpcCallJobqueueUpdate serial serial_content
405
              putMVar lock ()
406
              return $ mapM makeJobId [(current+1)..(current+n)]
407

    
408
-- | Allocate one new job id.
409
allocateJobId :: [Node] -> MVar () -> IO (Result JobId)
410
allocateJobId mastercandidates lock = do
411
  jids <- allocateJobIds mastercandidates lock 1
412
  return (jids >>= monadicThe "Failed to allocate precisely one Job ID")
413

    
414
-- | Decide if job queue is open
415
isQueueOpen :: IO Bool
416
isQueueOpen = liftM not (jobQueueDrainFile >>= doesFileExist)
417

    
418
-- | Enqueue jobs. This will guarantee that jobs get executed eventually.
419
-- Curenntly, the implementation is to unconditionally hand over the job
420
-- to masterd.
421
enqueueJobs :: [QueuedJob] -> IO ()
422
enqueueJobs jobs = do
423
  socketpath <- defaultMasterSocket
424
  client <- getClient socketpath
425
  pickupResults <- mapM (flip callMethod client . PickupJob . qjId) jobs
426
  let failures = map show $ justBad pickupResults
427
  unless (null failures)
428
   . logWarning . (++) "Failed to notify masterd: " . commaJoin $ failures