Statistics
| Branch: | Tag: | Revision:

root / src / Ganeti / JQueue.hs @ d605e261

History | View | Annotate | Download (16.3 kB)

1
{-# LANGUAGE TemplateHaskell #-}
2

    
3
{-| Implementation of the job queue.
4

    
5
-}
6

    
7
{-
8

    
9
Copyright (C) 2010, 2012 Google Inc.
10

    
11
This program is free software; you can redistribute it and/or modify
12
it under the terms of the GNU General Public License as published by
13
the Free Software Foundation; either version 2 of the License, or
14
(at your option) any later version.
15

    
16
This program is distributed in the hope that it will be useful, but
17
WITHOUT ANY WARRANTY; without even the implied warranty of
18
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19
General Public License for more details.
20

    
21
You should have received a copy of the GNU General Public License
22
along with this program; if not, write to the Free Software
23
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24
02110-1301, USA.
25

    
26
-}
27

    
28
module Ganeti.JQueue
29
    ( QueuedOpCode(..)
30
    , QueuedJob(..)
31
    , InputOpCode(..)
32
    , queuedOpCodeFromMetaOpCode
33
    , queuedJobFromOpCodes
34
    , Timestamp
35
    , noTimestamp
36
    , currentTimestamp
37
    , setReceivedTimestamp
38
    , opStatusFinalized
39
    , extractOpSummary
40
    , calcJobStatus
41
    , jobStarted
42
    , jobFinalized
43
    , calcJobPriority
44
    , jobFileName
45
    , liveJobFile
46
    , archivedJobFile
47
    , determineJobDirectories
48
    , getJobIDs
49
    , sortJobIDs
50
    , loadJobFromDisk
51
    , noSuchJob
52
    , readSerialFromDisk
53
    , allocateJobIds
54
    , allocateJobId
55
    , writeJobToDisk
56
    , replicateManyJobs
57
    , isQueueOpen
58
    , startJobs
59
    ) where
60

    
61
import Control.Concurrent.MVar
62
import Control.Exception
63
import Control.Monad
64
import Data.List
65
import Data.Maybe
66
import Data.Ord (comparing)
67
-- workaround what seems to be a bug in ghc 7.4's TH shadowing code
68
import Prelude hiding (log, id)
69
import System.Directory
70
import System.FilePath
71
import System.IO.Error (isDoesNotExistError)
72
import System.Posix.Files
73
import System.Time
74
import qualified Text.JSON
75
import Text.JSON.Types
76

    
77
import Ganeti.BasicTypes
78
import qualified Ganeti.Constants as C
79
import Ganeti.JSON
80
import Ganeti.Logging
81
import Ganeti.Luxi
82
import Ganeti.Objects (Node)
83
import Ganeti.OpCodes
84
import Ganeti.Path
85
import Ganeti.Rpc (executeRpcCall, ERpcError, logRpcErrors,
86
                   RpcCallJobqueueUpdate(..))
87
import Ganeti.THH
88
import Ganeti.Types
89
import Ganeti.Utils
90

    
91
-- * Data types
92

    
93
-- | The ganeti queue timestamp type. It represents the time as the pair
94
-- of seconds since the epoch and microseconds since the beginning of the
95
-- second.
96
type Timestamp = (Int, Int)
97

    
98
-- | Missing timestamp type.
99
noTimestamp :: Timestamp
100
noTimestamp = (-1, -1)
101

    
102
-- | Get the current time in the job-queue timestamp format.
103
currentTimestamp :: IO Timestamp
104
currentTimestamp = do
105
  TOD ctime pico <- getClockTime
106
  return (fromIntegral ctime, fromIntegral $ pico `div` 1000000)
107

    
108
-- | An input opcode.
109
data InputOpCode = ValidOpCode MetaOpCode -- ^ OpCode was parsed successfully
110
                 | InvalidOpCode JSValue  -- ^ Invalid opcode
111
                   deriving (Show, Eq)
112

    
113
-- | JSON instance for 'InputOpCode', trying to parse it and if
114
-- failing, keeping the original JSValue.
115
instance Text.JSON.JSON InputOpCode where
116
  showJSON (ValidOpCode mo) = Text.JSON.showJSON mo
117
  showJSON (InvalidOpCode inv) = inv
118
  readJSON v = case Text.JSON.readJSON v of
119
                 Text.JSON.Error _ -> return $ InvalidOpCode v
120
                 Text.JSON.Ok mo -> return $ ValidOpCode mo
121

    
122
-- | Invalid opcode summary.
123
invalidOp :: String
124
invalidOp = "INVALID_OP"
125

    
126
-- | Tries to extract the opcode summary from an 'InputOpCode'. This
127
-- duplicates some functionality from the 'opSummary' function in
128
-- "Ganeti.OpCodes".
129
extractOpSummary :: InputOpCode -> String
130
extractOpSummary (ValidOpCode metaop) = opSummary $ metaOpCode metaop
131
extractOpSummary (InvalidOpCode (JSObject o)) =
132
  case fromObjWithDefault (fromJSObject o) "OP_ID" ("OP_" ++ invalidOp) of
133
    Just s -> drop 3 s -- drop the OP_ prefix
134
    Nothing -> invalidOp
135
extractOpSummary _ = invalidOp
136

    
137
$(buildObject "QueuedOpCode" "qo"
138
  [ simpleField "input"           [t| InputOpCode |]
139
  , simpleField "status"          [t| OpStatus    |]
140
  , simpleField "result"          [t| JSValue     |]
141
  , defaultField [| [] |] $
142
    simpleField "log"             [t| [(Int, Timestamp, ELogType, JSValue)] |]
143
  , simpleField "priority"        [t| Int         |]
144
  , optionalNullSerField $
145
    simpleField "start_timestamp" [t| Timestamp   |]
146
  , optionalNullSerField $
147
    simpleField "exec_timestamp"  [t| Timestamp   |]
148
  , optionalNullSerField $
149
    simpleField "end_timestamp"   [t| Timestamp   |]
150
  ])
151

    
152
$(buildObject "QueuedJob" "qj"
153
  [ simpleField "id"                 [t| JobId          |]
154
  , simpleField "ops"                [t| [QueuedOpCode] |]
155
  , optionalNullSerField $
156
    simpleField "received_timestamp" [t| Timestamp      |]
157
  , optionalNullSerField $
158
    simpleField "start_timestamp"    [t| Timestamp      |]
159
  , optionalNullSerField $
160
    simpleField "end_timestamp"      [t| Timestamp      |]
161
  ])
162

    
163
-- | Convenience function to obtain a QueuedOpCode from a MetaOpCode
164
queuedOpCodeFromMetaOpCode :: MetaOpCode -> QueuedOpCode
165
queuedOpCodeFromMetaOpCode op =
166
  QueuedOpCode { qoInput = ValidOpCode op
167
               , qoStatus = OP_STATUS_QUEUED
168
               , qoPriority = opSubmitPriorityToRaw . opPriority . metaParams
169
                              $ op
170
               , qoLog = []
171
               , qoResult = JSNull
172
               , qoStartTimestamp = Nothing
173
               , qoEndTimestamp = Nothing
174
               , qoExecTimestamp = Nothing
175
               }
176

    
177
-- | From a job-id and a list of op-codes create a job. This is
178
-- the pure part of job creation, as allocating a new job id
179
-- lives in IO.
180
queuedJobFromOpCodes :: (Monad m) => JobId -> [MetaOpCode] -> m QueuedJob
181
queuedJobFromOpCodes jobid ops = do
182
  ops' <- mapM (`resolveDependencies` jobid) ops
183
  return QueuedJob { qjId = jobid
184
                   , qjOps = map queuedOpCodeFromMetaOpCode ops'
185
                   , qjReceivedTimestamp = Nothing 
186
                   , qjStartTimestamp = Nothing
187
                   , qjEndTimestamp = Nothing
188
                   }
189

    
190
-- | Attach a received timestamp to a Queued Job.
191
setReceivedTimestamp :: Timestamp -> QueuedJob -> QueuedJob
192
setReceivedTimestamp ts job = job { qjReceivedTimestamp = Just ts }
193

    
194
-- | Job file prefix.
195
jobFilePrefix :: String
196
jobFilePrefix = "job-"
197

    
198
-- | Computes the filename for a given job ID.
199
jobFileName :: JobId -> FilePath
200
jobFileName jid = jobFilePrefix ++ show (fromJobId jid)
201

    
202
-- | Parses a job ID from a file name.
203
parseJobFileId :: (Monad m) => FilePath -> m JobId
204
parseJobFileId path =
205
  case stripPrefix jobFilePrefix path of
206
    Nothing -> fail $ "Job file '" ++ path ++
207
                      "' doesn't have the correct prefix"
208
    Just suffix -> makeJobIdS suffix
209

    
210
-- | Computes the full path to a live job.
211
liveJobFile :: FilePath -> JobId -> FilePath
212
liveJobFile rootdir jid = rootdir </> jobFileName jid
213

    
214
-- | Computes the full path to an archives job. BROKEN.
215
archivedJobFile :: FilePath -> JobId -> FilePath
216
archivedJobFile rootdir jid =
217
  let subdir = show (fromJobId jid `div` C.jstoreJobsPerArchiveDirectory)
218
  in rootdir </> jobQueueArchiveSubDir </> subdir </> jobFileName jid
219

    
220
-- | Map from opcode status to job status.
221
opStatusToJob :: OpStatus -> JobStatus
222
opStatusToJob OP_STATUS_QUEUED    = JOB_STATUS_QUEUED
223
opStatusToJob OP_STATUS_WAITING   = JOB_STATUS_WAITING
224
opStatusToJob OP_STATUS_SUCCESS   = JOB_STATUS_SUCCESS
225
opStatusToJob OP_STATUS_RUNNING   = JOB_STATUS_RUNNING
226
opStatusToJob OP_STATUS_CANCELING = JOB_STATUS_CANCELING
227
opStatusToJob OP_STATUS_CANCELED  = JOB_STATUS_CANCELED
228
opStatusToJob OP_STATUS_ERROR     = JOB_STATUS_ERROR
229

    
230
-- | Computes a queued job's status.
231
calcJobStatus :: QueuedJob -> JobStatus
232
calcJobStatus QueuedJob { qjOps = ops } =
233
  extractOpSt (map qoStatus ops) JOB_STATUS_QUEUED True
234
    where
235
      terminalStatus OP_STATUS_ERROR     = True
236
      terminalStatus OP_STATUS_CANCELING = True
237
      terminalStatus OP_STATUS_CANCELED  = True
238
      terminalStatus _                   = False
239
      softStatus     OP_STATUS_SUCCESS   = True
240
      softStatus     OP_STATUS_QUEUED    = True
241
      softStatus     _                   = False
242
      extractOpSt [] _ True = JOB_STATUS_SUCCESS
243
      extractOpSt [] d False = d
244
      extractOpSt (x:xs) d old_all
245
           | terminalStatus x = opStatusToJob x -- abort recursion
246
           | softStatus x     = extractOpSt xs d new_all -- continue unchanged
247
           | otherwise        = extractOpSt xs (opStatusToJob x) new_all
248
           where new_all = x == OP_STATUS_SUCCESS && old_all
249

    
250
-- | Determine if a job has started
251
jobStarted :: QueuedJob -> Bool
252
jobStarted = (> JOB_STATUS_QUEUED) . calcJobStatus
253

    
254
-- | Determine if a job is finalised.
255
jobFinalized :: QueuedJob -> Bool
256
jobFinalized = (> JOB_STATUS_RUNNING) . calcJobStatus
257

    
258
-- | Determine whether an opcode status is finalized.
259
opStatusFinalized :: OpStatus -> Bool
260
opStatusFinalized = (> OP_STATUS_RUNNING)
261

    
262
-- | Compute a job's priority.
263
calcJobPriority :: QueuedJob -> Int
264
calcJobPriority QueuedJob { qjOps = ops } =
265
  helper . map qoPriority $ filter (not . opStatusFinalized . qoStatus) ops
266
    where helper [] = C.opPrioDefault
267
          helper ps = minimum ps
268

    
269
-- | Log but ignore an 'IOError'.
270
ignoreIOError :: a -> Bool -> String -> IOError -> IO a
271
ignoreIOError a ignore_noent msg e = do
272
  unless (isDoesNotExistError e && ignore_noent) .
273
    logWarning $ msg ++ ": " ++ show e
274
  return a
275

    
276
-- | Compute the list of existing archive directories. Note that I/O
277
-- exceptions are swallowed and ignored.
278
allArchiveDirs :: FilePath -> IO [FilePath]
279
allArchiveDirs rootdir = do
280
  let adir = rootdir </> jobQueueArchiveSubDir
281
  contents <- getDirectoryContents adir `Control.Exception.catch`
282
               ignoreIOError [] False
283
                 ("Failed to list queue directory " ++ adir)
284
  let fpaths = map (adir </>) $ filter (not . ("." `isPrefixOf`)) contents
285
  filterM (\path ->
286
             liftM isDirectory (getFileStatus (adir </> path))
287
               `Control.Exception.catch`
288
               ignoreIOError False True
289
                 ("Failed to stat archive path " ++ path)) fpaths
290

    
291
-- | Build list of directories containing job files. Note: compared to
292
-- the Python version, this doesn't ignore a potential lost+found
293
-- file.
294
determineJobDirectories :: FilePath -> Bool -> IO [FilePath]
295
determineJobDirectories rootdir archived = do
296
  other <- if archived
297
             then allArchiveDirs rootdir
298
             else return []
299
  return $ rootdir:other
300

    
301
-- Function equivalent to the \'sequence\' function, that cannot be used because
302
-- of library version conflict on Lucid.
303
-- FIXME: delete this and just use \'sequence\' instead when Lucid compatibility
304
-- will not be required anymore.
305
sequencer :: [Either IOError [JobId]] -> Either IOError [[JobId]]
306
sequencer l = fmap reverse $ foldl seqFolder (Right []) l
307

    
308
-- | Folding function for joining multiple [JobIds] into one list.
309
seqFolder :: Either IOError [[JobId]]
310
          -> Either IOError [JobId]
311
          -> Either IOError [[JobId]]
312
seqFolder (Left e) _ = Left e
313
seqFolder (Right _) (Left e) = Left e
314
seqFolder (Right l) (Right el) = Right $ el:l
315

    
316
-- | Computes the list of all jobs in the given directories.
317
getJobIDs :: [FilePath] -> IO (Either IOError [JobId])
318
getJobIDs paths = liftM (fmap concat . sequencer) (mapM getDirJobIDs paths)
319

    
320
-- | Sorts the a list of job IDs.
321
sortJobIDs :: [JobId] -> [JobId]
322
sortJobIDs = sortBy (comparing fromJobId)
323

    
324
-- | Computes the list of jobs in a given directory.
325
getDirJobIDs :: FilePath -> IO (Either IOError [JobId])
326
getDirJobIDs path = do
327
  either_contents <-
328
    try (getDirectoryContents path) :: IO (Either IOError [FilePath])
329
  case either_contents of
330
    Left e -> do
331
      logWarning $ "Failed to list job directory " ++ path ++ ": " ++ show e
332
      return $ Left e
333
    Right contents -> do
334
      let jids = foldl (\ids file ->
335
                         case parseJobFileId file of
336
                           Nothing -> ids
337
                           Just new_id -> new_id:ids) [] contents
338
      return . Right $ reverse jids
339

    
340
-- | Reads the job data from disk.
341
readJobDataFromDisk :: FilePath -> Bool -> JobId -> IO (Maybe (String, Bool))
342
readJobDataFromDisk rootdir archived jid = do
343
  let live_path = liveJobFile rootdir jid
344
      archived_path = archivedJobFile rootdir jid
345
      all_paths = if archived
346
                    then [(live_path, False), (archived_path, True)]
347
                    else [(live_path, False)]
348
  foldM (\state (path, isarchived) ->
349
           liftM (\r -> Just (r, isarchived)) (readFile path)
350
             `Control.Exception.catch`
351
             ignoreIOError state True
352
               ("Failed to read job file " ++ path)) Nothing all_paths
353

    
354
-- | Failed to load job error.
355
noSuchJob :: Result (QueuedJob, Bool)
356
noSuchJob = Bad "Can't load job file"
357

    
358
-- | Loads a job from disk.
359
loadJobFromDisk :: FilePath -> Bool -> JobId -> IO (Result (QueuedJob, Bool))
360
loadJobFromDisk rootdir archived jid = do
361
  raw <- readJobDataFromDisk rootdir archived jid
362
  -- note: we need some stricness below, otherwise the wrapping in a
363
  -- Result will create too much lazyness, and not close the file
364
  -- descriptors for the individual jobs
365
  return $! case raw of
366
             Nothing -> noSuchJob
367
             Just (str, arch) ->
368
               liftM (\qj -> (qj, arch)) .
369
               fromJResult "Parsing job file" $ Text.JSON.decode str
370

    
371
-- | Write a job to disk.
372
writeJobToDisk :: FilePath -> QueuedJob -> IO (Result ())
373
writeJobToDisk rootdir job = do
374
  let filename = liveJobFile rootdir . qjId $ job
375
      content = Text.JSON.encode . Text.JSON.showJSON $ job
376
  tryAndLogIOError (atomicWriteFile filename content)
377
                   ("Failed to write " ++ filename) Ok
378

    
379
-- | Replicate a job to all master candidates.
380
replicateJob :: FilePath -> [Node] -> QueuedJob -> IO [(Node, ERpcError ())]
381
replicateJob rootdir mastercandidates job = do
382
  let filename = liveJobFile rootdir . qjId $ job
383
      content = Text.JSON.encode . Text.JSON.showJSON $ job
384
  result <- executeRpcCall mastercandidates
385
              $ RpcCallJobqueueUpdate filename content
386
  logRpcErrors result
387
  return result
388

    
389
-- | Replicate many jobs to all master candidates.
390
replicateManyJobs :: FilePath -> [Node] -> [QueuedJob] -> IO ()
391
replicateManyJobs rootdir mastercandidates =
392
  mapM_ (replicateJob rootdir mastercandidates)
393

    
394
-- | Read the job serial number from disk.
395
readSerialFromDisk :: IO (Result JobId)
396
readSerialFromDisk = do
397
  filename <- jobQueueSerialFile
398
  tryAndLogIOError (readFile filename) "Failed to read serial file"
399
                   (makeJobIdS . rStripSpace)
400

    
401
-- | Allocate new job ids.
402
-- To avoid races while accessing the serial file, the threads synchronize
403
-- over a lock, as usual provided by an MVar.
404
allocateJobIds :: [Node] -> MVar () -> Int -> IO (Result [JobId])
405
allocateJobIds mastercandidates lock n =
406
  if n <= 0
407
    then return . Bad $ "Can only allocate positive number of job ids"
408
    else do
409
      takeMVar lock
410
      rjobid <- readSerialFromDisk
411
      case rjobid of
412
        Bad s -> do
413
          putMVar lock ()
414
          return . Bad $ s
415
        Ok jid -> do
416
          let current = fromJobId jid
417
              serial_content = show (current + n) ++  "\n"
418
          serial <- jobQueueSerialFile
419
          write_result <- try $ atomicWriteFile serial serial_content
420
                          :: IO (Either IOError ())
421
          case write_result of
422
            Left e -> do
423
              putMVar lock ()
424
              let msg = "Failed to write serial file: " ++ show e
425
              logError msg
426
              return . Bad $ msg 
427
            Right () -> do
428
              _ <- executeRpcCall mastercandidates
429
                     $ RpcCallJobqueueUpdate serial serial_content
430
              putMVar lock ()
431
              return $ mapM makeJobId [(current+1)..(current+n)]
432

    
433
-- | Allocate one new job id.
434
allocateJobId :: [Node] -> MVar () -> IO (Result JobId)
435
allocateJobId mastercandidates lock = do
436
  jids <- allocateJobIds mastercandidates lock 1
437
  return (jids >>= monadicThe "Failed to allocate precisely one Job ID")
438

    
439
-- | Decide if job queue is open
440
isQueueOpen :: IO Bool
441
isQueueOpen = liftM not (jobQueueDrainFile >>= doesFileExist)
442

    
443
-- | Start enqueued jobs, currently by handing them over to masterd.
444
startJobs :: [QueuedJob] -> IO ()
445
startJobs jobs = do
446
  socketpath <- defaultMasterSocket
447
  client <- getLuxiClient socketpath
448
  pickupResults <- mapM (flip callMethod client . PickupJob . qjId) jobs
449
  let failures = map show $ justBad pickupResults
450
  unless (null failures)
451
   . logWarning . (++) "Failed to notify masterd: " . commaJoin $ failures