Statistics
| Branch: | Tag: | Revision:

root / src / Ganeti / JQueue.hs @ 2af22d70

History | View | Annotate | Download (16.1 kB)

1
{-# LANGUAGE TemplateHaskell #-}
2

    
3
{-| Implementation of the job queue.
4

    
5
-}
6

    
7
{-
8

    
9
Copyright (C) 2010, 2012 Google Inc.
10

    
11
This program is free software; you can redistribute it and/or modify
12
it under the terms of the GNU General Public License as published by
13
the Free Software Foundation; either version 2 of the License, or
14
(at your option) any later version.
15

    
16
This program is distributed in the hope that it will be useful, but
17
WITHOUT ANY WARRANTY; without even the implied warranty of
18
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19
General Public License for more details.
20

    
21
You should have received a copy of the GNU General Public License
22
along with this program; if not, write to the Free Software
23
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24
02110-1301, USA.
25

    
26
-}
27

    
28
module Ganeti.JQueue
29
    ( QueuedOpCode(..)
30
    , QueuedJob(..)
31
    , InputOpCode(..)
32
    , queuedOpCodeFromMetaOpCode
33
    , queuedJobFromOpCodes
34
    , Timestamp
35
    , noTimestamp
36
    , currentTimestamp
37
    , setReceivedTimestamp
38
    , opStatusFinalized
39
    , extractOpSummary
40
    , calcJobStatus
41
    , calcJobPriority
42
    , jobFileName
43
    , liveJobFile
44
    , archivedJobFile
45
    , determineJobDirectories
46
    , getJobIDs
47
    , sortJobIDs
48
    , loadJobFromDisk
49
    , noSuchJob
50
    , readSerialFromDisk
51
    , allocateJobIds
52
    , allocateJobId
53
    , writeJobToDisk
54
    , replicateManyJobs
55
    , isQueueOpen
56
    , enqueueJobs
57
    ) where
58

    
59
import Control.Concurrent.MVar
60
import Control.Exception
61
import Control.Monad
62
import Data.List
63
import Data.Maybe
64
import Data.Ord (comparing)
65
-- workaround what seems to be a bug in ghc 7.4's TH shadowing code
66
import Prelude hiding (log, id)
67
import System.Directory
68
import System.FilePath
69
import System.IO.Error (isDoesNotExistError)
70
import System.Posix.Files
71
import System.Time
72
import qualified Text.JSON
73
import Text.JSON.Types
74

    
75
import Ganeti.BasicTypes
76
import qualified Ganeti.Constants as C
77
import Ganeti.JSON
78
import Ganeti.Logging
79
import Ganeti.Luxi
80
import Ganeti.Objects (Node)
81
import Ganeti.OpCodes
82
import Ganeti.Path
83
import Ganeti.Rpc (executeRpcCall, ERpcError, logRpcErrors,
84
                   RpcCallJobqueueUpdate(..))
85
import Ganeti.THH
86
import Ganeti.Types
87
import Ganeti.Utils
88

    
89
-- * Data types
90

    
91
-- | The ganeti queue timestamp type. It represents the time as the pair
92
-- of seconds since the epoch and microseconds since the beginning of the
93
-- second.
94
type Timestamp = (Int, Int)
95

    
96
-- | Missing timestamp type.
97
noTimestamp :: Timestamp
98
noTimestamp = (-1, -1)
99

    
100
-- | Get the current time in the job-queue timestamp format.
101
currentTimestamp :: IO Timestamp
102
currentTimestamp = do
103
  TOD ctime pico <- getClockTime
104
  return (fromIntegral ctime, fromIntegral $ pico `div` 1000000)
105

    
106
-- | An input opcode.
107
data InputOpCode = ValidOpCode MetaOpCode -- ^ OpCode was parsed successfully
108
                 | InvalidOpCode JSValue  -- ^ Invalid opcode
109
                   deriving (Show, Eq)
110

    
111
-- | JSON instance for 'InputOpCode', trying to parse it and if
112
-- failing, keeping the original JSValue.
113
instance Text.JSON.JSON InputOpCode where
114
  showJSON (ValidOpCode mo) = Text.JSON.showJSON mo
115
  showJSON (InvalidOpCode inv) = inv
116
  readJSON v = case Text.JSON.readJSON v of
117
                 Text.JSON.Error _ -> return $ InvalidOpCode v
118
                 Text.JSON.Ok mo -> return $ ValidOpCode mo
119

    
120
-- | Invalid opcode summary.
121
invalidOp :: String
122
invalidOp = "INVALID_OP"
123

    
124
-- | Tries to extract the opcode summary from an 'InputOpCode'. This
125
-- duplicates some functionality from the 'opSummary' function in
126
-- "Ganeti.OpCodes".
127
extractOpSummary :: InputOpCode -> String
128
extractOpSummary (ValidOpCode metaop) = opSummary $ metaOpCode metaop
129
extractOpSummary (InvalidOpCode (JSObject o)) =
130
  case fromObjWithDefault (fromJSObject o) "OP_ID" ("OP_" ++ invalidOp) of
131
    Just s -> drop 3 s -- drop the OP_ prefix
132
    Nothing -> invalidOp
133
extractOpSummary _ = invalidOp
134

    
135
$(buildObject "QueuedOpCode" "qo"
136
  [ simpleField "input"           [t| InputOpCode |]
137
  , simpleField "status"          [t| OpStatus    |]
138
  , simpleField "result"          [t| JSValue     |]
139
  , defaultField [| [] |] $
140
    simpleField "log"             [t| [(Int, Timestamp, ELogType, JSValue)] |]
141
  , simpleField "priority"        [t| Int         |]
142
  , optionalNullSerField $
143
    simpleField "start_timestamp" [t| Timestamp   |]
144
  , optionalNullSerField $
145
    simpleField "exec_timestamp"  [t| Timestamp   |]
146
  , optionalNullSerField $
147
    simpleField "end_timestamp"   [t| Timestamp   |]
148
  ])
149

    
150
$(buildObject "QueuedJob" "qj"
151
  [ simpleField "id"                 [t| JobId          |]
152
  , simpleField "ops"                [t| [QueuedOpCode] |]
153
  , optionalNullSerField $
154
    simpleField "received_timestamp" [t| Timestamp      |]
155
  , optionalNullSerField $
156
    simpleField "start_timestamp"    [t| Timestamp      |]
157
  , optionalNullSerField $
158
    simpleField "end_timestamp"      [t| Timestamp      |]
159
  ])
160

    
161
-- | Convenience function to obtain a QueuedOpCode from a MetaOpCode
162
queuedOpCodeFromMetaOpCode :: MetaOpCode -> QueuedOpCode
163
queuedOpCodeFromMetaOpCode op =
164
  QueuedOpCode { qoInput = ValidOpCode op
165
               , qoStatus = OP_STATUS_QUEUED
166
               , qoPriority = opSubmitPriorityToRaw . opPriority . metaParams
167
                              $ op
168
               , qoLog = []
169
               , qoResult = JSNull
170
               , qoStartTimestamp = Nothing
171
               , qoEndTimestamp = Nothing
172
               , qoExecTimestamp = Nothing
173
               }
174

    
175
-- | From a job-id and a list of op-codes create a job. This is
176
-- the pure part of job creation, as allocating a new job id
177
-- lives in IO.
178
queuedJobFromOpCodes :: (Monad m) => JobId -> [MetaOpCode] -> m QueuedJob
179
queuedJobFromOpCodes jobid ops = do
180
  ops' <- mapM (`resolveDependencies` jobid) ops
181
  return QueuedJob { qjId = jobid
182
                   , qjOps = map queuedOpCodeFromMetaOpCode ops'
183
                   , qjReceivedTimestamp = Nothing 
184
                   , qjStartTimestamp = Nothing
185
                   , qjEndTimestamp = Nothing
186
                   }
187

    
188
-- | Attach a received timestamp to a Queued Job.
189
setReceivedTimestamp :: Timestamp -> QueuedJob -> QueuedJob
190
setReceivedTimestamp ts job = job { qjReceivedTimestamp = Just ts }
191

    
192
-- | Job file prefix.
193
jobFilePrefix :: String
194
jobFilePrefix = "job-"
195

    
196
-- | Computes the filename for a given job ID.
197
jobFileName :: JobId -> FilePath
198
jobFileName jid = jobFilePrefix ++ show (fromJobId jid)
199

    
200
-- | Parses a job ID from a file name.
201
parseJobFileId :: (Monad m) => FilePath -> m JobId
202
parseJobFileId path =
203
  case stripPrefix jobFilePrefix path of
204
    Nothing -> fail $ "Job file '" ++ path ++
205
                      "' doesn't have the correct prefix"
206
    Just suffix -> makeJobIdS suffix
207

    
208
-- | Computes the full path to a live job.
209
liveJobFile :: FilePath -> JobId -> FilePath
210
liveJobFile rootdir jid = rootdir </> jobFileName jid
211

    
212
-- | Computes the full path to an archives job. BROKEN.
213
archivedJobFile :: FilePath -> JobId -> FilePath
214
archivedJobFile rootdir jid =
215
  let subdir = show (fromJobId jid `div` C.jstoreJobsPerArchiveDirectory)
216
  in rootdir </> jobQueueArchiveSubDir </> subdir </> jobFileName jid
217

    
218
-- | Map from opcode status to job status.
219
opStatusToJob :: OpStatus -> JobStatus
220
opStatusToJob OP_STATUS_QUEUED    = JOB_STATUS_QUEUED
221
opStatusToJob OP_STATUS_WAITING   = JOB_STATUS_WAITING
222
opStatusToJob OP_STATUS_SUCCESS   = JOB_STATUS_SUCCESS
223
opStatusToJob OP_STATUS_RUNNING   = JOB_STATUS_RUNNING
224
opStatusToJob OP_STATUS_CANCELING = JOB_STATUS_CANCELING
225
opStatusToJob OP_STATUS_CANCELED  = JOB_STATUS_CANCELED
226
opStatusToJob OP_STATUS_ERROR     = JOB_STATUS_ERROR
227

    
228
-- | Computes a queued job's status.
229
calcJobStatus :: QueuedJob -> JobStatus
230
calcJobStatus QueuedJob { qjOps = ops } =
231
  extractOpSt (map qoStatus ops) JOB_STATUS_QUEUED True
232
    where
233
      terminalStatus OP_STATUS_ERROR     = True
234
      terminalStatus OP_STATUS_CANCELING = True
235
      terminalStatus OP_STATUS_CANCELED  = True
236
      terminalStatus _                   = False
237
      softStatus     OP_STATUS_SUCCESS   = True
238
      softStatus     OP_STATUS_QUEUED    = True
239
      softStatus     _                   = False
240
      extractOpSt [] _ True = JOB_STATUS_SUCCESS
241
      extractOpSt [] d False = d
242
      extractOpSt (x:xs) d old_all
243
           | terminalStatus x = opStatusToJob x -- abort recursion
244
           | softStatus x     = extractOpSt xs d new_all -- continue unchanged
245
           | otherwise        = extractOpSt xs (opStatusToJob x) new_all
246
           where new_all = x == OP_STATUS_SUCCESS && old_all
247

    
248
-- | Determine whether an opcode status is finalized.
249
opStatusFinalized :: OpStatus -> Bool
250
opStatusFinalized = (> OP_STATUS_RUNNING)
251

    
252
-- | Compute a job's priority.
253
calcJobPriority :: QueuedJob -> Int
254
calcJobPriority QueuedJob { qjOps = ops } =
255
  helper . map qoPriority $ filter (not . opStatusFinalized . qoStatus) ops
256
    where helper [] = C.opPrioDefault
257
          helper ps = minimum ps
258

    
259
-- | Log but ignore an 'IOError'.
260
ignoreIOError :: a -> Bool -> String -> IOError -> IO a
261
ignoreIOError a ignore_noent msg e = do
262
  unless (isDoesNotExistError e && ignore_noent) .
263
    logWarning $ msg ++ ": " ++ show e
264
  return a
265

    
266
-- | Compute the list of existing archive directories. Note that I/O
267
-- exceptions are swallowed and ignored.
268
allArchiveDirs :: FilePath -> IO [FilePath]
269
allArchiveDirs rootdir = do
270
  let adir = rootdir </> jobQueueArchiveSubDir
271
  contents <- getDirectoryContents adir `Control.Exception.catch`
272
               ignoreIOError [] False
273
                 ("Failed to list queue directory " ++ adir)
274
  let fpaths = map (adir </>) $ filter (not . ("." `isPrefixOf`)) contents
275
  filterM (\path ->
276
             liftM isDirectory (getFileStatus (adir </> path))
277
               `Control.Exception.catch`
278
               ignoreIOError False True
279
                 ("Failed to stat archive path " ++ path)) fpaths
280

    
281
-- | Build list of directories containing job files. Note: compared to
282
-- the Python version, this doesn't ignore a potential lost+found
283
-- file.
284
determineJobDirectories :: FilePath -> Bool -> IO [FilePath]
285
determineJobDirectories rootdir archived = do
286
  other <- if archived
287
             then allArchiveDirs rootdir
288
             else return []
289
  return $ rootdir:other
290

    
291
-- Function equivalent to the \'sequence\' function, that cannot be used because
292
-- of library version conflict on Lucid.
293
-- FIXME: delete this and just use \'sequence\' instead when Lucid compatibility
294
-- will not be required anymore.
295
sequencer :: [Either IOError [JobId]] -> Either IOError [[JobId]]
296
sequencer l = fmap reverse $ foldl seqFolder (Right []) l
297

    
298
-- | Folding function for joining multiple [JobIds] into one list.
299
seqFolder :: Either IOError [[JobId]]
300
          -> Either IOError [JobId]
301
          -> Either IOError [[JobId]]
302
seqFolder (Left e) _ = Left e
303
seqFolder (Right _) (Left e) = Left e
304
seqFolder (Right l) (Right el) = Right $ el:l
305

    
306
-- | Computes the list of all jobs in the given directories.
307
getJobIDs :: [FilePath] -> IO (Either IOError [JobId])
308
getJobIDs paths = liftM (fmap concat . sequencer) (mapM getDirJobIDs paths)
309

    
310
-- | Sorts the a list of job IDs.
311
sortJobIDs :: [JobId] -> [JobId]
312
sortJobIDs = sortBy (comparing fromJobId)
313

    
314
-- | Computes the list of jobs in a given directory.
315
getDirJobIDs :: FilePath -> IO (Either IOError [JobId])
316
getDirJobIDs path = do
317
  either_contents <-
318
    try (getDirectoryContents path) :: IO (Either IOError [FilePath])
319
  case either_contents of
320
    Left e -> do
321
      logWarning $ "Failed to list job directory " ++ path ++ ": " ++ show e
322
      return $ Left e
323
    Right contents -> do
324
      let jids = foldl (\ids file ->
325
                         case parseJobFileId file of
326
                           Nothing -> ids
327
                           Just new_id -> new_id:ids) [] contents
328
      return . Right $ reverse jids
329

    
330
-- | Reads the job data from disk.
331
readJobDataFromDisk :: FilePath -> Bool -> JobId -> IO (Maybe (String, Bool))
332
readJobDataFromDisk rootdir archived jid = do
333
  let live_path = liveJobFile rootdir jid
334
      archived_path = archivedJobFile rootdir jid
335
      all_paths = if archived
336
                    then [(live_path, False), (archived_path, True)]
337
                    else [(live_path, False)]
338
  foldM (\state (path, isarchived) ->
339
           liftM (\r -> Just (r, isarchived)) (readFile path)
340
             `Control.Exception.catch`
341
             ignoreIOError state True
342
               ("Failed to read job file " ++ path)) Nothing all_paths
343

    
344
-- | Failed to load job error.
345
noSuchJob :: Result (QueuedJob, Bool)
346
noSuchJob = Bad "Can't load job file"
347

    
348
-- | Loads a job from disk.
349
loadJobFromDisk :: FilePath -> Bool -> JobId -> IO (Result (QueuedJob, Bool))
350
loadJobFromDisk rootdir archived jid = do
351
  raw <- readJobDataFromDisk rootdir archived jid
352
  -- note: we need some stricness below, otherwise the wrapping in a
353
  -- Result will create too much lazyness, and not close the file
354
  -- descriptors for the individual jobs
355
  return $! case raw of
356
             Nothing -> noSuchJob
357
             Just (str, arch) ->
358
               liftM (\qj -> (qj, arch)) .
359
               fromJResult "Parsing job file" $ Text.JSON.decode str
360

    
361
-- | Write a job to disk.
362
writeJobToDisk :: FilePath -> QueuedJob -> IO (Result ())
363
writeJobToDisk rootdir job = do
364
  let filename = liveJobFile rootdir . qjId $ job
365
      content = Text.JSON.encode . Text.JSON.showJSON $ job
366
  tryAndLogIOError (atomicWriteFile filename content)
367
                   ("Failed to write " ++ filename) Ok
368

    
369
-- | Replicate a job to all master candidates.
370
replicateJob :: FilePath -> [Node] -> QueuedJob -> IO [(Node, ERpcError ())]
371
replicateJob rootdir mastercandidates job = do
372
  let filename = liveJobFile rootdir . qjId $ job
373
      content = Text.JSON.encode . Text.JSON.showJSON $ job
374
  result <- executeRpcCall mastercandidates
375
              $ RpcCallJobqueueUpdate filename content
376
  logRpcErrors result
377
  return result
378

    
379
-- | Replicate many jobs to all master candidates.
380
replicateManyJobs :: FilePath -> [Node] -> [QueuedJob] -> IO ()
381
replicateManyJobs rootdir mastercandidates =
382
  mapM_ (replicateJob rootdir mastercandidates)
383

    
384
-- | Read the job serial number from disk.
385
readSerialFromDisk :: IO (Result JobId)
386
readSerialFromDisk = do
387
  filename <- jobQueueSerialFile
388
  tryAndLogIOError (readFile filename) "Failed to read serial file"
389
                   (makeJobIdS . rStripSpace)
390

    
391
-- | Allocate new job ids.
392
-- To avoid races while accessing the serial file, the threads synchronize
393
-- over a lock, as usual provided by an MVar.
394
allocateJobIds :: [Node] -> MVar () -> Int -> IO (Result [JobId])
395
allocateJobIds mastercandidates lock n =
396
  if n <= 0
397
    then return . Bad $ "Can only allocate positive number of job ids"
398
    else do
399
      takeMVar lock
400
      rjobid <- readSerialFromDisk
401
      case rjobid of
402
        Bad s -> do
403
          putMVar lock ()
404
          return . Bad $ s
405
        Ok jid -> do
406
          let current = fromJobId jid
407
              serial_content = show (current + n) ++  "\n"
408
          serial <- jobQueueSerialFile
409
          write_result <- try $ atomicWriteFile serial serial_content
410
                          :: IO (Either IOError ())
411
          case write_result of
412
            Left e -> do
413
              putMVar lock ()
414
              let msg = "Failed to write serial file: " ++ show e
415
              logError msg
416
              return . Bad $ msg 
417
            Right () -> do
418
              _ <- executeRpcCall mastercandidates
419
                     $ RpcCallJobqueueUpdate serial serial_content
420
              putMVar lock ()
421
              return $ mapM makeJobId [(current+1)..(current+n)]
422

    
423
-- | Allocate one new job id.
424
allocateJobId :: [Node] -> MVar () -> IO (Result JobId)
425
allocateJobId mastercandidates lock = do
426
  jids <- allocateJobIds mastercandidates lock 1
427
  return (jids >>= monadicThe "Failed to allocate precisely one Job ID")
428

    
429
-- | Decide if job queue is open
430
isQueueOpen :: IO Bool
431
isQueueOpen = liftM not (jobQueueDrainFile >>= doesFileExist)
432

    
433
-- | Enqueue jobs. This will guarantee that jobs get executed eventually.
434
-- Curenntly, the implementation is to unconditionally hand over the job
435
-- to masterd.
436
enqueueJobs :: [QueuedJob] -> IO ()
437
enqueueJobs jobs = do
438
  socketpath <- defaultMasterSocket
439
  client <- getClient socketpath
440
  pickupResults <- mapM (flip callMethod client . PickupJob . qjId) jobs
441
  let failures = map show $ justBad pickupResults
442
  unless (null failures)
443
   . logWarning . (++) "Failed to notify masterd: " . commaJoin $ failures