Statistics
| Branch: | Tag: | Revision:

root / src / Ganeti / JQueue.hs @ 557f5dad

History | View | Annotate | Download (16.4 kB)

1
{-# LANGUAGE TemplateHaskell #-}
2

    
3
{-| Implementation of the job queue.
4

    
5
-}
6

    
7
{-
8

    
9
Copyright (C) 2010, 2012 Google Inc.
10

    
11
This program is free software; you can redistribute it and/or modify
12
it under the terms of the GNU General Public License as published by
13
the Free Software Foundation; either version 2 of the License, or
14
(at your option) any later version.
15

    
16
This program is distributed in the hope that it will be useful, but
17
WITHOUT ANY WARRANTY; without even the implied warranty of
18
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19
General Public License for more details.
20

    
21
You should have received a copy of the GNU General Public License
22
along with this program; if not, write to the Free Software
23
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24
02110-1301, USA.
25

    
26
-}
27

    
28
module Ganeti.JQueue
29
    ( QueuedOpCode(..)
30
    , QueuedJob(..)
31
    , InputOpCode(..)
32
    , queuedOpCodeFromMetaOpCode
33
    , queuedJobFromOpCodes
34
    , Timestamp
35
    , noTimestamp
36
    , currentTimestamp
37
    , setReceivedTimestamp
38
    , opStatusFinalized
39
    , extractOpSummary
40
    , calcJobStatus
41
    , jobStarted
42
    , jobFinalized
43
    , calcJobPriority
44
    , jobFileName
45
    , liveJobFile
46
    , archivedJobFile
47
    , determineJobDirectories
48
    , getJobIDs
49
    , sortJobIDs
50
    , loadJobFromDisk
51
    , noSuchJob
52
    , readSerialFromDisk
53
    , allocateJobIds
54
    , allocateJobId
55
    , writeJobToDisk
56
    , replicateManyJobs
57
    , isQueueOpen
58
    , startJobs
59
    ) where
60

    
61
import Control.Arrow (second)
62
import Control.Concurrent.MVar
63
import Control.Exception
64
import Control.Monad
65
import Data.Functor ((<$))
66
import Data.List
67
import Data.Maybe
68
import Data.Ord (comparing)
69
-- workaround what seems to be a bug in ghc 7.4's TH shadowing code
70
import Prelude hiding (id, log)
71
import System.Directory
72
import System.FilePath
73
import System.IO.Error (isDoesNotExistError)
74
import System.Posix.Files
75
import System.Time
76
import qualified Text.JSON
77
import Text.JSON.Types
78

    
79
import Ganeti.BasicTypes
80
import qualified Ganeti.Constants as C
81
import Ganeti.JSON
82
import Ganeti.Logging
83
import Ganeti.Luxi
84
import Ganeti.Objects (Node)
85
import Ganeti.OpCodes
86
import Ganeti.Path
87
import Ganeti.Rpc (executeRpcCall, ERpcError, logRpcErrors,
88
                   RpcCallJobqueueUpdate(..))
89
import Ganeti.THH
90
import Ganeti.Types
91
import Ganeti.Utils
92

    
93
-- * Data types
94

    
95
-- | The ganeti queue timestamp type. It represents the time as the pair
96
-- of seconds since the epoch and microseconds since the beginning of the
97
-- second.
98
type Timestamp = (Int, Int)
99

    
100
-- | Missing timestamp type.
101
noTimestamp :: Timestamp
102
noTimestamp = (-1, -1)
103

    
104
-- | Get the current time in the job-queue timestamp format.
105
currentTimestamp :: IO Timestamp
106
currentTimestamp = do
107
  TOD ctime pico <- getClockTime
108
  return (fromIntegral ctime, fromIntegral $ pico `div` 1000000)
109

    
110
-- | An input opcode.
111
data InputOpCode = ValidOpCode MetaOpCode -- ^ OpCode was parsed successfully
112
                 | InvalidOpCode JSValue  -- ^ Invalid opcode
113
                   deriving (Show, Eq)
114

    
115
-- | JSON instance for 'InputOpCode', trying to parse it and if
116
-- failing, keeping the original JSValue.
117
instance Text.JSON.JSON InputOpCode where
118
  showJSON (ValidOpCode mo) = Text.JSON.showJSON mo
119
  showJSON (InvalidOpCode inv) = inv
120
  readJSON v = case Text.JSON.readJSON v of
121
                 Text.JSON.Error _ -> return $ InvalidOpCode v
122
                 Text.JSON.Ok mo -> return $ ValidOpCode mo
123

    
124
-- | Invalid opcode summary.
125
invalidOp :: String
126
invalidOp = "INVALID_OP"
127

    
128
-- | Tries to extract the opcode summary from an 'InputOpCode'. This
129
-- duplicates some functionality from the 'opSummary' function in
130
-- "Ganeti.OpCodes".
131
extractOpSummary :: InputOpCode -> String
132
extractOpSummary (ValidOpCode metaop) = opSummary $ metaOpCode metaop
133
extractOpSummary (InvalidOpCode (JSObject o)) =
134
  case fromObjWithDefault (fromJSObject o) "OP_ID" ("OP_" ++ invalidOp) of
135
    Just s -> drop 3 s -- drop the OP_ prefix
136
    Nothing -> invalidOp
137
extractOpSummary _ = invalidOp
138

    
139
$(buildObject "QueuedOpCode" "qo"
140
  [ simpleField "input"           [t| InputOpCode |]
141
  , simpleField "status"          [t| OpStatus    |]
142
  , simpleField "result"          [t| JSValue     |]
143
  , defaultField [| [] |] $
144
    simpleField "log"             [t| [(Int, Timestamp, ELogType, JSValue)] |]
145
  , simpleField "priority"        [t| Int         |]
146
  , optionalNullSerField $
147
    simpleField "start_timestamp" [t| Timestamp   |]
148
  , optionalNullSerField $
149
    simpleField "exec_timestamp"  [t| Timestamp   |]
150
  , optionalNullSerField $
151
    simpleField "end_timestamp"   [t| Timestamp   |]
152
  ])
153

    
154
$(buildObject "QueuedJob" "qj"
155
  [ simpleField "id"                 [t| JobId          |]
156
  , simpleField "ops"                [t| [QueuedOpCode] |]
157
  , optionalNullSerField $
158
    simpleField "received_timestamp" [t| Timestamp      |]
159
  , optionalNullSerField $
160
    simpleField "start_timestamp"    [t| Timestamp      |]
161
  , optionalNullSerField $
162
    simpleField "end_timestamp"      [t| Timestamp      |]
163
  ])
164

    
165
-- | Convenience function to obtain a QueuedOpCode from a MetaOpCode
166
queuedOpCodeFromMetaOpCode :: MetaOpCode -> QueuedOpCode
167
queuedOpCodeFromMetaOpCode op =
168
  QueuedOpCode { qoInput = ValidOpCode op
169
               , qoStatus = OP_STATUS_QUEUED
170
               , qoPriority = opSubmitPriorityToRaw . opPriority . metaParams
171
                              $ op
172
               , qoLog = []
173
               , qoResult = JSNull
174
               , qoStartTimestamp = Nothing
175
               , qoEndTimestamp = Nothing
176
               , qoExecTimestamp = Nothing
177
               }
178

    
179
-- | From a job-id and a list of op-codes create a job. This is
180
-- the pure part of job creation, as allocating a new job id
181
-- lives in IO.
182
queuedJobFromOpCodes :: (Monad m) => JobId -> [MetaOpCode] -> m QueuedJob
183
queuedJobFromOpCodes jobid ops = do
184
  ops' <- mapM (`resolveDependencies` jobid) ops
185
  return QueuedJob { qjId = jobid
186
                   , qjOps = map queuedOpCodeFromMetaOpCode ops'
187
                   , qjReceivedTimestamp = Nothing 
188
                   , qjStartTimestamp = Nothing
189
                   , qjEndTimestamp = Nothing
190
                   }
191

    
192
-- | Attach a received timestamp to a Queued Job.
193
setReceivedTimestamp :: Timestamp -> QueuedJob -> QueuedJob
194
setReceivedTimestamp ts job = job { qjReceivedTimestamp = Just ts }
195

    
196
-- | Job file prefix.
197
jobFilePrefix :: String
198
jobFilePrefix = "job-"
199

    
200
-- | Computes the filename for a given job ID.
201
jobFileName :: JobId -> FilePath
202
jobFileName jid = jobFilePrefix ++ show (fromJobId jid)
203

    
204
-- | Parses a job ID from a file name.
205
parseJobFileId :: (Monad m) => FilePath -> m JobId
206
parseJobFileId path =
207
  case stripPrefix jobFilePrefix path of
208
    Nothing -> fail $ "Job file '" ++ path ++
209
                      "' doesn't have the correct prefix"
210
    Just suffix -> makeJobIdS suffix
211

    
212
-- | Computes the full path to a live job.
213
liveJobFile :: FilePath -> JobId -> FilePath
214
liveJobFile rootdir jid = rootdir </> jobFileName jid
215

    
216
-- | Computes the full path to an archives job. BROKEN.
217
archivedJobFile :: FilePath -> JobId -> FilePath
218
archivedJobFile rootdir jid =
219
  let subdir = show (fromJobId jid `div` C.jstoreJobsPerArchiveDirectory)
220
  in rootdir </> jobQueueArchiveSubDir </> subdir </> jobFileName jid
221

    
222
-- | Map from opcode status to job status.
223
opStatusToJob :: OpStatus -> JobStatus
224
opStatusToJob OP_STATUS_QUEUED    = JOB_STATUS_QUEUED
225
opStatusToJob OP_STATUS_WAITING   = JOB_STATUS_WAITING
226
opStatusToJob OP_STATUS_SUCCESS   = JOB_STATUS_SUCCESS
227
opStatusToJob OP_STATUS_RUNNING   = JOB_STATUS_RUNNING
228
opStatusToJob OP_STATUS_CANCELING = JOB_STATUS_CANCELING
229
opStatusToJob OP_STATUS_CANCELED  = JOB_STATUS_CANCELED
230
opStatusToJob OP_STATUS_ERROR     = JOB_STATUS_ERROR
231

    
232
-- | Computes a queued job's status.
233
calcJobStatus :: QueuedJob -> JobStatus
234
calcJobStatus QueuedJob { qjOps = ops } =
235
  extractOpSt (map qoStatus ops) JOB_STATUS_QUEUED True
236
    where
237
      terminalStatus OP_STATUS_ERROR     = True
238
      terminalStatus OP_STATUS_CANCELING = True
239
      terminalStatus OP_STATUS_CANCELED  = True
240
      terminalStatus _                   = False
241
      softStatus     OP_STATUS_SUCCESS   = True
242
      softStatus     OP_STATUS_QUEUED    = True
243
      softStatus     _                   = False
244
      extractOpSt [] _ True = JOB_STATUS_SUCCESS
245
      extractOpSt [] d False = d
246
      extractOpSt (x:xs) d old_all
247
           | terminalStatus x = opStatusToJob x -- abort recursion
248
           | softStatus x     = extractOpSt xs d new_all -- continue unchanged
249
           | otherwise        = extractOpSt xs (opStatusToJob x) new_all
250
           where new_all = x == OP_STATUS_SUCCESS && old_all
251

    
252
-- | Determine if a job has started
253
jobStarted :: QueuedJob -> Bool
254
jobStarted = (> JOB_STATUS_QUEUED) . calcJobStatus
255

    
256
-- | Determine if a job is finalised.
257
jobFinalized :: QueuedJob -> Bool
258
jobFinalized = (> JOB_STATUS_RUNNING) . calcJobStatus
259

    
260
-- | Determine whether an opcode status is finalized.
261
opStatusFinalized :: OpStatus -> Bool
262
opStatusFinalized = (> OP_STATUS_RUNNING)
263

    
264
-- | Compute a job's priority.
265
calcJobPriority :: QueuedJob -> Int
266
calcJobPriority QueuedJob { qjOps = ops } =
267
  helper . map qoPriority $ filter (not . opStatusFinalized . qoStatus) ops
268
    where helper [] = C.opPrioDefault
269
          helper ps = minimum ps
270

    
271
-- | Log but ignore an 'IOError'.
272
ignoreIOError :: a -> Bool -> String -> IOError -> IO a
273
ignoreIOError a ignore_noent msg e = do
274
  unless (isDoesNotExistError e && ignore_noent) .
275
    logWarning $ msg ++ ": " ++ show e
276
  return a
277

    
278
-- | Compute the list of existing archive directories. Note that I/O
279
-- exceptions are swallowed and ignored.
280
allArchiveDirs :: FilePath -> IO [FilePath]
281
allArchiveDirs rootdir = do
282
  let adir = rootdir </> jobQueueArchiveSubDir
283
  contents <- getDirectoryContents adir `Control.Exception.catch`
284
               ignoreIOError [] False
285
                 ("Failed to list queue directory " ++ adir)
286
  let fpaths = map (adir </>) $ filter (not . ("." `isPrefixOf`)) contents
287
  filterM (\path ->
288
             liftM isDirectory (getFileStatus (adir </> path))
289
               `Control.Exception.catch`
290
               ignoreIOError False True
291
                 ("Failed to stat archive path " ++ path)) fpaths
292

    
293
-- | Build list of directories containing job files. Note: compared to
294
-- the Python version, this doesn't ignore a potential lost+found
295
-- file.
296
determineJobDirectories :: FilePath -> Bool -> IO [FilePath]
297
determineJobDirectories rootdir archived = do
298
  other <- if archived
299
             then allArchiveDirs rootdir
300
             else return []
301
  return $ rootdir:other
302

    
303
-- Function equivalent to the \'sequence\' function, that cannot be used because
304
-- of library version conflict on Lucid.
305
-- FIXME: delete this and just use \'sequence\' instead when Lucid compatibility
306
-- will not be required anymore.
307
sequencer :: [Either IOError [JobId]] -> Either IOError [[JobId]]
308
sequencer l = fmap reverse $ foldl seqFolder (Right []) l
309

    
310
-- | Folding function for joining multiple [JobIds] into one list.
311
seqFolder :: Either IOError [[JobId]]
312
          -> Either IOError [JobId]
313
          -> Either IOError [[JobId]]
314
seqFolder (Left e) _ = Left e
315
seqFolder (Right _) (Left e) = Left e
316
seqFolder (Right l) (Right el) = Right $ el:l
317

    
318
-- | Computes the list of all jobs in the given directories.
319
getJobIDs :: [FilePath] -> IO (Either IOError [JobId])
320
getJobIDs paths = liftM (fmap concat . sequencer) (mapM getDirJobIDs paths)
321

    
322
-- | Sorts the a list of job IDs.
323
sortJobIDs :: [JobId] -> [JobId]
324
sortJobIDs = sortBy (comparing fromJobId)
325

    
326
-- | Computes the list of jobs in a given directory.
327
getDirJobIDs :: FilePath -> IO (Either IOError [JobId])
328
getDirJobIDs path = do
329
  either_contents <-
330
    try (getDirectoryContents path) :: IO (Either IOError [FilePath])
331
  case either_contents of
332
    Left e -> do
333
      logWarning $ "Failed to list job directory " ++ path ++ ": " ++ show e
334
      return $ Left e
335
    Right contents -> do
336
      let jids = foldl (\ids file ->
337
                         case parseJobFileId file of
338
                           Nothing -> ids
339
                           Just new_id -> new_id:ids) [] contents
340
      return . Right $ reverse jids
341

    
342
-- | Reads the job data from disk.
343
readJobDataFromDisk :: FilePath -> Bool -> JobId -> IO (Maybe (String, Bool))
344
readJobDataFromDisk rootdir archived jid = do
345
  let live_path = liveJobFile rootdir jid
346
      archived_path = archivedJobFile rootdir jid
347
      all_paths = if archived
348
                    then [(live_path, False), (archived_path, True)]
349
                    else [(live_path, False)]
350
  foldM (\state (path, isarchived) ->
351
           liftM (\r -> Just (r, isarchived)) (readFile path)
352
             `Control.Exception.catch`
353
             ignoreIOError state True
354
               ("Failed to read job file " ++ path)) Nothing all_paths
355

    
356
-- | Failed to load job error.
357
noSuchJob :: Result (QueuedJob, Bool)
358
noSuchJob = Bad "Can't load job file"
359

    
360
-- | Loads a job from disk.
361
loadJobFromDisk :: FilePath -> Bool -> JobId -> IO (Result (QueuedJob, Bool))
362
loadJobFromDisk rootdir archived jid = do
363
  raw <- readJobDataFromDisk rootdir archived jid
364
  -- note: we need some stricness below, otherwise the wrapping in a
365
  -- Result will create too much lazyness, and not close the file
366
  -- descriptors for the individual jobs
367
  return $! case raw of
368
             Nothing -> noSuchJob
369
             Just (str, arch) ->
370
               liftM (\qj -> (qj, arch)) .
371
               fromJResult "Parsing job file" $ Text.JSON.decode str
372

    
373
-- | Write a job to disk.
374
writeJobToDisk :: FilePath -> QueuedJob -> IO (Result ())
375
writeJobToDisk rootdir job = do
376
  let filename = liveJobFile rootdir . qjId $ job
377
      content = Text.JSON.encode . Text.JSON.showJSON $ job
378
  tryAndLogIOError (atomicWriteFile filename content)
379
                   ("Failed to write " ++ filename) Ok
380

    
381
-- | Replicate a job to all master candidates.
382
replicateJob :: FilePath -> [Node] -> QueuedJob -> IO [(Node, ERpcError ())]
383
replicateJob rootdir mastercandidates job = do
384
  let filename = liveJobFile rootdir . qjId $ job
385
      content = Text.JSON.encode . Text.JSON.showJSON $ job
386
  callresult <- executeRpcCall mastercandidates
387
                  $ RpcCallJobqueueUpdate filename content
388
  let result = map (second (() <$)) callresult
389
  logRpcErrors result
390
  return result
391

    
392
-- | Replicate many jobs to all master candidates.
393
replicateManyJobs :: FilePath -> [Node] -> [QueuedJob] -> IO ()
394
replicateManyJobs rootdir mastercandidates =
395
  mapM_ (replicateJob rootdir mastercandidates)
396

    
397
-- | Read the job serial number from disk.
398
readSerialFromDisk :: IO (Result JobId)
399
readSerialFromDisk = do
400
  filename <- jobQueueSerialFile
401
  tryAndLogIOError (readFile filename) "Failed to read serial file"
402
                   (makeJobIdS . rStripSpace)
403

    
404
-- | Allocate new job ids.
405
-- To avoid races while accessing the serial file, the threads synchronize
406
-- over a lock, as usual provided by an MVar.
407
allocateJobIds :: [Node] -> MVar () -> Int -> IO (Result [JobId])
408
allocateJobIds mastercandidates lock n =
409
  if n <= 0
410
    then return . Bad $ "Can only allocate positive number of job ids"
411
    else do
412
      takeMVar lock
413
      rjobid <- readSerialFromDisk
414
      case rjobid of
415
        Bad s -> do
416
          putMVar lock ()
417
          return . Bad $ s
418
        Ok jid -> do
419
          let current = fromJobId jid
420
              serial_content = show (current + n) ++  "\n"
421
          serial <- jobQueueSerialFile
422
          write_result <- try $ atomicWriteFile serial serial_content
423
                          :: IO (Either IOError ())
424
          case write_result of
425
            Left e -> do
426
              putMVar lock ()
427
              let msg = "Failed to write serial file: " ++ show e
428
              logError msg
429
              return . Bad $ msg 
430
            Right () -> do
431
              _ <- executeRpcCall mastercandidates
432
                     $ RpcCallJobqueueUpdate serial serial_content
433
              putMVar lock ()
434
              return $ mapM makeJobId [(current+1)..(current+n)]
435

    
436
-- | Allocate one new job id.
437
allocateJobId :: [Node] -> MVar () -> IO (Result JobId)
438
allocateJobId mastercandidates lock = do
439
  jids <- allocateJobIds mastercandidates lock 1
440
  return (jids >>= monadicThe "Failed to allocate precisely one Job ID")
441

    
442
-- | Decide if job queue is open
443
isQueueOpen :: IO Bool
444
isQueueOpen = liftM not (jobQueueDrainFile >>= doesFileExist)
445

    
446
-- | Start enqueued jobs, currently by handing them over to masterd.
447
startJobs :: [QueuedJob] -> IO ()
448
startJobs jobs = do
449
  socketpath <- defaultMasterSocket
450
  client <- getLuxiClient socketpath
451
  pickupResults <- mapM (flip callMethod client . PickupJob . qjId) jobs
452
  let failures = map show $ justBad pickupResults
453
  unless (null failures)
454
   . logWarning . (++) "Failed to notify masterd: " . commaJoin $ failures