Statistics
| Branch: | Tag: | Revision:

root / src / Ganeti / JQueue.hs @ 847df9e9

History | View | Annotate | Download (16.1 kB)

1
{-# LANGUAGE TemplateHaskell #-}
2

    
3
{-| Implementation of the job queue.
4

    
5
-}
6

    
7
{-
8

    
9
Copyright (C) 2010, 2012 Google Inc.
10

    
11
This program is free software; you can redistribute it and/or modify
12
it under the terms of the GNU General Public License as published by
13
the Free Software Foundation; either version 2 of the License, or
14
(at your option) any later version.
15

    
16
This program is distributed in the hope that it will be useful, but
17
WITHOUT ANY WARRANTY; without even the implied warranty of
18
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19
General Public License for more details.
20

    
21
You should have received a copy of the GNU General Public License
22
along with this program; if not, write to the Free Software
23
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24
02110-1301, USA.
25

    
26
-}
27

    
28
module Ganeti.JQueue
29
    ( QueuedOpCode(..)
30
    , QueuedJob(..)
31
    , InputOpCode(..)
32
    , queuedOpCodeFromMetaOpCode
33
    , queuedJobFromOpCodes
34
    , Timestamp
35
    , noTimestamp
36
    , currentTimestamp
37
    , setReceivedTimestamp
38
    , opStatusFinalized
39
    , extractOpSummary
40
    , calcJobStatus
41
    , jobFinalized
42
    , calcJobPriority
43
    , jobFileName
44
    , liveJobFile
45
    , archivedJobFile
46
    , determineJobDirectories
47
    , getJobIDs
48
    , sortJobIDs
49
    , loadJobFromDisk
50
    , noSuchJob
51
    , readSerialFromDisk
52
    , allocateJobIds
53
    , allocateJobId
54
    , writeJobToDisk
55
    , replicateManyJobs
56
    , isQueueOpen
57
    , startJobs
58
    ) where
59

    
60
import Control.Concurrent.MVar
61
import Control.Exception
62
import Control.Monad
63
import Data.List
64
import Data.Maybe
65
import Data.Ord (comparing)
66
-- workaround what seems to be a bug in ghc 7.4's TH shadowing code
67
import Prelude hiding (log, id)
68
import System.Directory
69
import System.FilePath
70
import System.IO.Error (isDoesNotExistError)
71
import System.Posix.Files
72
import System.Time
73
import qualified Text.JSON
74
import Text.JSON.Types
75

    
76
import Ganeti.BasicTypes
77
import qualified Ganeti.Constants as C
78
import Ganeti.JSON
79
import Ganeti.Logging
80
import Ganeti.Luxi
81
import Ganeti.Objects (Node)
82
import Ganeti.OpCodes
83
import Ganeti.Path
84
import Ganeti.Rpc (executeRpcCall, ERpcError, logRpcErrors,
85
                   RpcCallJobqueueUpdate(..))
86
import Ganeti.THH
87
import Ganeti.Types
88
import Ganeti.Utils
89

    
90
-- * Data types
91

    
92
-- | The ganeti queue timestamp type. It represents the time as the pair
93
-- of seconds since the epoch and microseconds since the beginning of the
94
-- second.
95
type Timestamp = (Int, Int)
96

    
97
-- | Missing timestamp type.
98
noTimestamp :: Timestamp
99
noTimestamp = (-1, -1)
100

    
101
-- | Get the current time in the job-queue timestamp format.
102
currentTimestamp :: IO Timestamp
103
currentTimestamp = do
104
  TOD ctime pico <- getClockTime
105
  return (fromIntegral ctime, fromIntegral $ pico `div` 1000000)
106

    
107
-- | An input opcode.
108
data InputOpCode = ValidOpCode MetaOpCode -- ^ OpCode was parsed successfully
109
                 | InvalidOpCode JSValue  -- ^ Invalid opcode
110
                   deriving (Show, Eq)
111

    
112
-- | JSON instance for 'InputOpCode', trying to parse it and if
113
-- failing, keeping the original JSValue.
114
instance Text.JSON.JSON InputOpCode where
115
  showJSON (ValidOpCode mo) = Text.JSON.showJSON mo
116
  showJSON (InvalidOpCode inv) = inv
117
  readJSON v = case Text.JSON.readJSON v of
118
                 Text.JSON.Error _ -> return $ InvalidOpCode v
119
                 Text.JSON.Ok mo -> return $ ValidOpCode mo
120

    
121
-- | Invalid opcode summary.
122
invalidOp :: String
123
invalidOp = "INVALID_OP"
124

    
125
-- | Tries to extract the opcode summary from an 'InputOpCode'. This
126
-- duplicates some functionality from the 'opSummary' function in
127
-- "Ganeti.OpCodes".
128
extractOpSummary :: InputOpCode -> String
129
extractOpSummary (ValidOpCode metaop) = opSummary $ metaOpCode metaop
130
extractOpSummary (InvalidOpCode (JSObject o)) =
131
  case fromObjWithDefault (fromJSObject o) "OP_ID" ("OP_" ++ invalidOp) of
132
    Just s -> drop 3 s -- drop the OP_ prefix
133
    Nothing -> invalidOp
134
extractOpSummary _ = invalidOp
135

    
136
$(buildObject "QueuedOpCode" "qo"
137
  [ simpleField "input"           [t| InputOpCode |]
138
  , simpleField "status"          [t| OpStatus    |]
139
  , simpleField "result"          [t| JSValue     |]
140
  , defaultField [| [] |] $
141
    simpleField "log"             [t| [(Int, Timestamp, ELogType, JSValue)] |]
142
  , simpleField "priority"        [t| Int         |]
143
  , optionalNullSerField $
144
    simpleField "start_timestamp" [t| Timestamp   |]
145
  , optionalNullSerField $
146
    simpleField "exec_timestamp"  [t| Timestamp   |]
147
  , optionalNullSerField $
148
    simpleField "end_timestamp"   [t| Timestamp   |]
149
  ])
150

    
151
$(buildObject "QueuedJob" "qj"
152
  [ simpleField "id"                 [t| JobId          |]
153
  , simpleField "ops"                [t| [QueuedOpCode] |]
154
  , optionalNullSerField $
155
    simpleField "received_timestamp" [t| Timestamp      |]
156
  , optionalNullSerField $
157
    simpleField "start_timestamp"    [t| Timestamp      |]
158
  , optionalNullSerField $
159
    simpleField "end_timestamp"      [t| Timestamp      |]
160
  ])
161

    
162
-- | Convenience function to obtain a QueuedOpCode from a MetaOpCode
163
queuedOpCodeFromMetaOpCode :: MetaOpCode -> QueuedOpCode
164
queuedOpCodeFromMetaOpCode op =
165
  QueuedOpCode { qoInput = ValidOpCode op
166
               , qoStatus = OP_STATUS_QUEUED
167
               , qoPriority = opSubmitPriorityToRaw . opPriority . metaParams
168
                              $ op
169
               , qoLog = []
170
               , qoResult = JSNull
171
               , qoStartTimestamp = Nothing
172
               , qoEndTimestamp = Nothing
173
               , qoExecTimestamp = Nothing
174
               }
175

    
176
-- | From a job-id and a list of op-codes create a job. This is
177
-- the pure part of job creation, as allocating a new job id
178
-- lives in IO.
179
queuedJobFromOpCodes :: (Monad m) => JobId -> [MetaOpCode] -> m QueuedJob
180
queuedJobFromOpCodes jobid ops = do
181
  ops' <- mapM (`resolveDependencies` jobid) ops
182
  return QueuedJob { qjId = jobid
183
                   , qjOps = map queuedOpCodeFromMetaOpCode ops'
184
                   , qjReceivedTimestamp = Nothing 
185
                   , qjStartTimestamp = Nothing
186
                   , qjEndTimestamp = Nothing
187
                   }
188

    
189
-- | Attach a received timestamp to a Queued Job.
190
setReceivedTimestamp :: Timestamp -> QueuedJob -> QueuedJob
191
setReceivedTimestamp ts job = job { qjReceivedTimestamp = Just ts }
192

    
193
-- | Job file prefix.
194
jobFilePrefix :: String
195
jobFilePrefix = "job-"
196

    
197
-- | Computes the filename for a given job ID.
198
jobFileName :: JobId -> FilePath
199
jobFileName jid = jobFilePrefix ++ show (fromJobId jid)
200

    
201
-- | Parses a job ID from a file name.
202
parseJobFileId :: (Monad m) => FilePath -> m JobId
203
parseJobFileId path =
204
  case stripPrefix jobFilePrefix path of
205
    Nothing -> fail $ "Job file '" ++ path ++
206
                      "' doesn't have the correct prefix"
207
    Just suffix -> makeJobIdS suffix
208

    
209
-- | Computes the full path to a live job.
210
liveJobFile :: FilePath -> JobId -> FilePath
211
liveJobFile rootdir jid = rootdir </> jobFileName jid
212

    
213
-- | Computes the full path to an archives job. BROKEN.
214
archivedJobFile :: FilePath -> JobId -> FilePath
215
archivedJobFile rootdir jid =
216
  let subdir = show (fromJobId jid `div` C.jstoreJobsPerArchiveDirectory)
217
  in rootdir </> jobQueueArchiveSubDir </> subdir </> jobFileName jid
218

    
219
-- | Map from opcode status to job status.
220
opStatusToJob :: OpStatus -> JobStatus
221
opStatusToJob OP_STATUS_QUEUED    = JOB_STATUS_QUEUED
222
opStatusToJob OP_STATUS_WAITING   = JOB_STATUS_WAITING
223
opStatusToJob OP_STATUS_SUCCESS   = JOB_STATUS_SUCCESS
224
opStatusToJob OP_STATUS_RUNNING   = JOB_STATUS_RUNNING
225
opStatusToJob OP_STATUS_CANCELING = JOB_STATUS_CANCELING
226
opStatusToJob OP_STATUS_CANCELED  = JOB_STATUS_CANCELED
227
opStatusToJob OP_STATUS_ERROR     = JOB_STATUS_ERROR
228

    
229
-- | Computes a queued job's status.
230
calcJobStatus :: QueuedJob -> JobStatus
231
calcJobStatus QueuedJob { qjOps = ops } =
232
  extractOpSt (map qoStatus ops) JOB_STATUS_QUEUED True
233
    where
234
      terminalStatus OP_STATUS_ERROR     = True
235
      terminalStatus OP_STATUS_CANCELING = True
236
      terminalStatus OP_STATUS_CANCELED  = True
237
      terminalStatus _                   = False
238
      softStatus     OP_STATUS_SUCCESS   = True
239
      softStatus     OP_STATUS_QUEUED    = True
240
      softStatus     _                   = False
241
      extractOpSt [] _ True = JOB_STATUS_SUCCESS
242
      extractOpSt [] d False = d
243
      extractOpSt (x:xs) d old_all
244
           | terminalStatus x = opStatusToJob x -- abort recursion
245
           | softStatus x     = extractOpSt xs d new_all -- continue unchanged
246
           | otherwise        = extractOpSt xs (opStatusToJob x) new_all
247
           where new_all = x == OP_STATUS_SUCCESS && old_all
248

    
249
-- | Determine if a job is finalised.
250
jobFinalized :: QueuedJob -> Bool
251
jobFinalized = (> JOB_STATUS_RUNNING) . calcJobStatus
252

    
253
-- | Determine whether an opcode status is finalized.
254
opStatusFinalized :: OpStatus -> Bool
255
opStatusFinalized = (> OP_STATUS_RUNNING)
256

    
257
-- | Compute a job's priority.
258
calcJobPriority :: QueuedJob -> Int
259
calcJobPriority QueuedJob { qjOps = ops } =
260
  helper . map qoPriority $ filter (not . opStatusFinalized . qoStatus) ops
261
    where helper [] = C.opPrioDefault
262
          helper ps = minimum ps
263

    
264
-- | Log but ignore an 'IOError'.
265
ignoreIOError :: a -> Bool -> String -> IOError -> IO a
266
ignoreIOError a ignore_noent msg e = do
267
  unless (isDoesNotExistError e && ignore_noent) .
268
    logWarning $ msg ++ ": " ++ show e
269
  return a
270

    
271
-- | Compute the list of existing archive directories. Note that I/O
272
-- exceptions are swallowed and ignored.
273
allArchiveDirs :: FilePath -> IO [FilePath]
274
allArchiveDirs rootdir = do
275
  let adir = rootdir </> jobQueueArchiveSubDir
276
  contents <- getDirectoryContents adir `Control.Exception.catch`
277
               ignoreIOError [] False
278
                 ("Failed to list queue directory " ++ adir)
279
  let fpaths = map (adir </>) $ filter (not . ("." `isPrefixOf`)) contents
280
  filterM (\path ->
281
             liftM isDirectory (getFileStatus (adir </> path))
282
               `Control.Exception.catch`
283
               ignoreIOError False True
284
                 ("Failed to stat archive path " ++ path)) fpaths
285

    
286
-- | Build list of directories containing job files. Note: compared to
287
-- the Python version, this doesn't ignore a potential lost+found
288
-- file.
289
determineJobDirectories :: FilePath -> Bool -> IO [FilePath]
290
determineJobDirectories rootdir archived = do
291
  other <- if archived
292
             then allArchiveDirs rootdir
293
             else return []
294
  return $ rootdir:other
295

    
296
-- Function equivalent to the \'sequence\' function, that cannot be used because
297
-- of library version conflict on Lucid.
298
-- FIXME: delete this and just use \'sequence\' instead when Lucid compatibility
299
-- will not be required anymore.
300
sequencer :: [Either IOError [JobId]] -> Either IOError [[JobId]]
301
sequencer l = fmap reverse $ foldl seqFolder (Right []) l
302

    
303
-- | Folding function for joining multiple [JobIds] into one list.
304
seqFolder :: Either IOError [[JobId]]
305
          -> Either IOError [JobId]
306
          -> Either IOError [[JobId]]
307
seqFolder (Left e) _ = Left e
308
seqFolder (Right _) (Left e) = Left e
309
seqFolder (Right l) (Right el) = Right $ el:l
310

    
311
-- | Computes the list of all jobs in the given directories.
312
getJobIDs :: [FilePath] -> IO (Either IOError [JobId])
313
getJobIDs paths = liftM (fmap concat . sequencer) (mapM getDirJobIDs paths)
314

    
315
-- | Sorts the a list of job IDs.
316
sortJobIDs :: [JobId] -> [JobId]
317
sortJobIDs = sortBy (comparing fromJobId)
318

    
319
-- | Computes the list of jobs in a given directory.
320
getDirJobIDs :: FilePath -> IO (Either IOError [JobId])
321
getDirJobIDs path = do
322
  either_contents <-
323
    try (getDirectoryContents path) :: IO (Either IOError [FilePath])
324
  case either_contents of
325
    Left e -> do
326
      logWarning $ "Failed to list job directory " ++ path ++ ": " ++ show e
327
      return $ Left e
328
    Right contents -> do
329
      let jids = foldl (\ids file ->
330
                         case parseJobFileId file of
331
                           Nothing -> ids
332
                           Just new_id -> new_id:ids) [] contents
333
      return . Right $ reverse jids
334

    
335
-- | Reads the job data from disk.
336
readJobDataFromDisk :: FilePath -> Bool -> JobId -> IO (Maybe (String, Bool))
337
readJobDataFromDisk rootdir archived jid = do
338
  let live_path = liveJobFile rootdir jid
339
      archived_path = archivedJobFile rootdir jid
340
      all_paths = if archived
341
                    then [(live_path, False), (archived_path, True)]
342
                    else [(live_path, False)]
343
  foldM (\state (path, isarchived) ->
344
           liftM (\r -> Just (r, isarchived)) (readFile path)
345
             `Control.Exception.catch`
346
             ignoreIOError state True
347
               ("Failed to read job file " ++ path)) Nothing all_paths
348

    
349
-- | Failed to load job error.
350
noSuchJob :: Result (QueuedJob, Bool)
351
noSuchJob = Bad "Can't load job file"
352

    
353
-- | Loads a job from disk.
354
loadJobFromDisk :: FilePath -> Bool -> JobId -> IO (Result (QueuedJob, Bool))
355
loadJobFromDisk rootdir archived jid = do
356
  raw <- readJobDataFromDisk rootdir archived jid
357
  -- note: we need some stricness below, otherwise the wrapping in a
358
  -- Result will create too much lazyness, and not close the file
359
  -- descriptors for the individual jobs
360
  return $! case raw of
361
             Nothing -> noSuchJob
362
             Just (str, arch) ->
363
               liftM (\qj -> (qj, arch)) .
364
               fromJResult "Parsing job file" $ Text.JSON.decode str
365

    
366
-- | Write a job to disk.
367
writeJobToDisk :: FilePath -> QueuedJob -> IO (Result ())
368
writeJobToDisk rootdir job = do
369
  let filename = liveJobFile rootdir . qjId $ job
370
      content = Text.JSON.encode . Text.JSON.showJSON $ job
371
  tryAndLogIOError (atomicWriteFile filename content)
372
                   ("Failed to write " ++ filename) Ok
373

    
374
-- | Replicate a job to all master candidates.
375
replicateJob :: FilePath -> [Node] -> QueuedJob -> IO [(Node, ERpcError ())]
376
replicateJob rootdir mastercandidates job = do
377
  let filename = liveJobFile rootdir . qjId $ job
378
      content = Text.JSON.encode . Text.JSON.showJSON $ job
379
  result <- executeRpcCall mastercandidates
380
              $ RpcCallJobqueueUpdate filename content
381
  logRpcErrors result
382
  return result
383

    
384
-- | Replicate many jobs to all master candidates.
385
replicateManyJobs :: FilePath -> [Node] -> [QueuedJob] -> IO ()
386
replicateManyJobs rootdir mastercandidates =
387
  mapM_ (replicateJob rootdir mastercandidates)
388

    
389
-- | Read the job serial number from disk.
390
readSerialFromDisk :: IO (Result JobId)
391
readSerialFromDisk = do
392
  filename <- jobQueueSerialFile
393
  tryAndLogIOError (readFile filename) "Failed to read serial file"
394
                   (makeJobIdS . rStripSpace)
395

    
396
-- | Allocate new job ids.
397
-- To avoid races while accessing the serial file, the threads synchronize
398
-- over a lock, as usual provided by an MVar.
399
allocateJobIds :: [Node] -> MVar () -> Int -> IO (Result [JobId])
400
allocateJobIds mastercandidates lock n =
401
  if n <= 0
402
    then return . Bad $ "Can only allocate positive number of job ids"
403
    else do
404
      takeMVar lock
405
      rjobid <- readSerialFromDisk
406
      case rjobid of
407
        Bad s -> do
408
          putMVar lock ()
409
          return . Bad $ s
410
        Ok jid -> do
411
          let current = fromJobId jid
412
              serial_content = show (current + n) ++  "\n"
413
          serial <- jobQueueSerialFile
414
          write_result <- try $ atomicWriteFile serial serial_content
415
                          :: IO (Either IOError ())
416
          case write_result of
417
            Left e -> do
418
              putMVar lock ()
419
              let msg = "Failed to write serial file: " ++ show e
420
              logError msg
421
              return . Bad $ msg 
422
            Right () -> do
423
              _ <- executeRpcCall mastercandidates
424
                     $ RpcCallJobqueueUpdate serial serial_content
425
              putMVar lock ()
426
              return $ mapM makeJobId [(current+1)..(current+n)]
427

    
428
-- | Allocate one new job id.
429
allocateJobId :: [Node] -> MVar () -> IO (Result JobId)
430
allocateJobId mastercandidates lock = do
431
  jids <- allocateJobIds mastercandidates lock 1
432
  return (jids >>= monadicThe "Failed to allocate precisely one Job ID")
433

    
434
-- | Decide if job queue is open
435
isQueueOpen :: IO Bool
436
isQueueOpen = liftM not (jobQueueDrainFile >>= doesFileExist)
437

    
438
-- | Start enqueued jobs, currently by handing them over to masterd.
439
startJobs :: [QueuedJob] -> IO ()
440
startJobs jobs = do
441
  socketpath <- defaultMasterSocket
442
  client <- getClient socketpath
443
  pickupResults <- mapM (flip callMethod client . PickupJob . qjId) jobs
444
  let failures = map show $ justBad pickupResults
445
  unless (null failures)
446
   . logWarning . (++) "Failed to notify masterd: " . commaJoin $ failures