Statistics
| Branch: | Tag: | Revision:

root / src / Ganeti / JQueue.hs @ c3a70209

History | View | Annotate | Download (15.9 kB)

1
{-# LANGUAGE TemplateHaskell #-}
2

    
3
{-| Implementation of the job queue.
4

    
5
-}
6

    
7
{-
8

    
9
Copyright (C) 2010, 2012 Google Inc.
10

    
11
This program is free software; you can redistribute it and/or modify
12
it under the terms of the GNU General Public License as published by
13
the Free Software Foundation; either version 2 of the License, or
14
(at your option) any later version.
15

    
16
This program is distributed in the hope that it will be useful, but
17
WITHOUT ANY WARRANTY; without even the implied warranty of
18
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19
General Public License for more details.
20

    
21
You should have received a copy of the GNU General Public License
22
along with this program; if not, write to the Free Software
23
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24
02110-1301, USA.
25

    
26
-}
27

    
28
module Ganeti.JQueue
29
    ( QueuedOpCode(..)
30
    , QueuedJob(..)
31
    , InputOpCode(..)
32
    , queuedOpCodeFromMetaOpCode
33
    , queuedJobFromOpCodes
34
    , Timestamp
35
    , noTimestamp
36
    , currentTimestamp
37
    , opStatusFinalized
38
    , extractOpSummary
39
    , calcJobStatus
40
    , calcJobPriority
41
    , jobFileName
42
    , liveJobFile
43
    , archivedJobFile
44
    , determineJobDirectories
45
    , getJobIDs
46
    , sortJobIDs
47
    , loadJobFromDisk
48
    , noSuchJob
49
    , readSerialFromDisk
50
    , allocateJobIds
51
    , allocateJobId
52
    , writeJobToDisk
53
    , replicateManyJobs
54
    , isQueueOpen
55
    , enqueueJobs
56
    ) where
57

    
58
import Control.Concurrent.MVar
59
import Control.Exception
60
import Control.Monad
61
import Data.List
62
import Data.Maybe
63
import Data.Ord (comparing)
64
-- workaround what seems to be a bug in ghc 7.4's TH shadowing code
65
import Prelude hiding (log, id)
66
import System.Directory
67
import System.FilePath
68
import System.IO.Error (isDoesNotExistError)
69
import System.Posix.Files
70
import System.Time
71
import qualified Text.JSON
72
import Text.JSON.Types
73

    
74
import Ganeti.BasicTypes
75
import qualified Ganeti.Constants as C
76
import Ganeti.JSON
77
import Ganeti.Logging
78
import Ganeti.Luxi
79
import Ganeti.Objects (Node)
80
import Ganeti.OpCodes
81
import Ganeti.Path
82
import Ganeti.Rpc (executeRpcCall, ERpcError, logRpcErrors,
83
                   RpcCallJobqueueUpdate(..))
84
import Ganeti.THH
85
import Ganeti.Types
86
import Ganeti.Utils
87

    
88
-- * Data types
89

    
90
-- | The ganeti queue timestamp type. It represents the time as the pair
91
-- of seconds since the epoch and microseconds since the beginning of the
92
-- second.
93
type Timestamp = (Int, Int)
94

    
95
-- | Missing timestamp type.
96
noTimestamp :: Timestamp
97
noTimestamp = (-1, -1)
98

    
99
-- | Get the current time in the job-queue timestamp format.
100
currentTimestamp :: IO Timestamp
101
currentTimestamp = do
102
  TOD ctime pico <- getClockTime
103
  return (fromIntegral ctime, fromIntegral $ pico `div` 1000000)
104

    
105
-- | An input opcode.
106
data InputOpCode = ValidOpCode MetaOpCode -- ^ OpCode was parsed successfully
107
                 | InvalidOpCode JSValue  -- ^ Invalid opcode
108
                   deriving (Show, Eq)
109

    
110
-- | JSON instance for 'InputOpCode', trying to parse it and if
111
-- failing, keeping the original JSValue.
112
instance Text.JSON.JSON InputOpCode where
113
  showJSON (ValidOpCode mo) = Text.JSON.showJSON mo
114
  showJSON (InvalidOpCode inv) = inv
115
  readJSON v = case Text.JSON.readJSON v of
116
                 Text.JSON.Error _ -> return $ InvalidOpCode v
117
                 Text.JSON.Ok mo -> return $ ValidOpCode mo
118

    
119
-- | Invalid opcode summary.
120
invalidOp :: String
121
invalidOp = "INVALID_OP"
122

    
123
-- | Tries to extract the opcode summary from an 'InputOpCode'. This
124
-- duplicates some functionality from the 'opSummary' function in
125
-- "Ganeti.OpCodes".
126
extractOpSummary :: InputOpCode -> String
127
extractOpSummary (ValidOpCode metaop) = opSummary $ metaOpCode metaop
128
extractOpSummary (InvalidOpCode (JSObject o)) =
129
  case fromObjWithDefault (fromJSObject o) "OP_ID" ("OP_" ++ invalidOp) of
130
    Just s -> drop 3 s -- drop the OP_ prefix
131
    Nothing -> invalidOp
132
extractOpSummary _ = invalidOp
133

    
134
$(buildObject "QueuedOpCode" "qo"
135
  [ simpleField "input"           [t| InputOpCode |]
136
  , simpleField "status"          [t| OpStatus    |]
137
  , simpleField "result"          [t| JSValue     |]
138
  , defaultField [| [] |] $
139
    simpleField "log"             [t| [(Int, Timestamp, ELogType, JSValue)] |]
140
  , simpleField "priority"        [t| Int         |]
141
  , optionalNullSerField $
142
    simpleField "start_timestamp" [t| Timestamp   |]
143
  , optionalNullSerField $
144
    simpleField "exec_timestamp"  [t| Timestamp   |]
145
  , optionalNullSerField $
146
    simpleField "end_timestamp"   [t| Timestamp   |]
147
  ])
148

    
149
$(buildObject "QueuedJob" "qj"
150
  [ simpleField "id"                 [t| JobId          |]
151
  , simpleField "ops"                [t| [QueuedOpCode] |]
152
  , optionalNullSerField $
153
    simpleField "received_timestamp" [t| Timestamp      |]
154
  , optionalNullSerField $
155
    simpleField "start_timestamp"    [t| Timestamp      |]
156
  , optionalNullSerField $
157
    simpleField "end_timestamp"      [t| Timestamp      |]
158
  ])
159

    
160
-- | Convenience function to obtain a QueuedOpCode from a MetaOpCode
161
queuedOpCodeFromMetaOpCode :: MetaOpCode -> QueuedOpCode
162
queuedOpCodeFromMetaOpCode op =
163
  QueuedOpCode { qoInput = ValidOpCode op
164
               , qoStatus = OP_STATUS_QUEUED
165
               , qoPriority = opSubmitPriorityToRaw . opPriority . metaParams
166
                              $ op
167
               , qoLog = []
168
               , qoResult = JSNull
169
               , qoStartTimestamp = Nothing
170
               , qoEndTimestamp = Nothing
171
               , qoExecTimestamp = Nothing
172
               }
173

    
174
-- | From a job-id and a list of op-codes create a job. This is
175
-- the pure part of job creation, as allocating a new job id
176
-- lives in IO.
177
queuedJobFromOpCodes :: (Monad m) => JobId -> [MetaOpCode] -> m QueuedJob
178
queuedJobFromOpCodes jobid ops = do
179
  ops' <- mapM (`resolveDependencies` jobid) ops
180
  return QueuedJob { qjId = jobid
181
                   , qjOps = map queuedOpCodeFromMetaOpCode ops'
182
                   , qjReceivedTimestamp = Nothing 
183
                   , qjStartTimestamp = Nothing
184
                   , qjEndTimestamp = Nothing
185
                   }
186

    
187
-- | Job file prefix.
188
jobFilePrefix :: String
189
jobFilePrefix = "job-"
190

    
191
-- | Computes the filename for a given job ID.
192
jobFileName :: JobId -> FilePath
193
jobFileName jid = jobFilePrefix ++ show (fromJobId jid)
194

    
195
-- | Parses a job ID from a file name.
196
parseJobFileId :: (Monad m) => FilePath -> m JobId
197
parseJobFileId path =
198
  case stripPrefix jobFilePrefix path of
199
    Nothing -> fail $ "Job file '" ++ path ++
200
                      "' doesn't have the correct prefix"
201
    Just suffix -> makeJobIdS suffix
202

    
203
-- | Computes the full path to a live job.
204
liveJobFile :: FilePath -> JobId -> FilePath
205
liveJobFile rootdir jid = rootdir </> jobFileName jid
206

    
207
-- | Computes the full path to an archives job. BROKEN.
208
archivedJobFile :: FilePath -> JobId -> FilePath
209
archivedJobFile rootdir jid =
210
  let subdir = show (fromJobId jid `div` C.jstoreJobsPerArchiveDirectory)
211
  in rootdir </> jobQueueArchiveSubDir </> subdir </> jobFileName jid
212

    
213
-- | Map from opcode status to job status.
214
opStatusToJob :: OpStatus -> JobStatus
215
opStatusToJob OP_STATUS_QUEUED    = JOB_STATUS_QUEUED
216
opStatusToJob OP_STATUS_WAITING   = JOB_STATUS_WAITING
217
opStatusToJob OP_STATUS_SUCCESS   = JOB_STATUS_SUCCESS
218
opStatusToJob OP_STATUS_RUNNING   = JOB_STATUS_RUNNING
219
opStatusToJob OP_STATUS_CANCELING = JOB_STATUS_CANCELING
220
opStatusToJob OP_STATUS_CANCELED  = JOB_STATUS_CANCELED
221
opStatusToJob OP_STATUS_ERROR     = JOB_STATUS_ERROR
222

    
223
-- | Computes a queued job's status.
224
calcJobStatus :: QueuedJob -> JobStatus
225
calcJobStatus QueuedJob { qjOps = ops } =
226
  extractOpSt (map qoStatus ops) JOB_STATUS_QUEUED True
227
    where
228
      terminalStatus OP_STATUS_ERROR     = True
229
      terminalStatus OP_STATUS_CANCELING = True
230
      terminalStatus OP_STATUS_CANCELED  = True
231
      terminalStatus _                   = False
232
      softStatus     OP_STATUS_SUCCESS   = True
233
      softStatus     OP_STATUS_QUEUED    = True
234
      softStatus     _                   = False
235
      extractOpSt [] _ True = JOB_STATUS_SUCCESS
236
      extractOpSt [] d False = d
237
      extractOpSt (x:xs) d old_all
238
           | terminalStatus x = opStatusToJob x -- abort recursion
239
           | softStatus x     = extractOpSt xs d new_all -- continue unchanged
240
           | otherwise        = extractOpSt xs (opStatusToJob x) new_all
241
           where new_all = x == OP_STATUS_SUCCESS && old_all
242

    
243
-- | Determine whether an opcode status is finalized.
244
opStatusFinalized :: OpStatus -> Bool
245
opStatusFinalized = (> OP_STATUS_RUNNING)
246

    
247
-- | Compute a job's priority.
248
calcJobPriority :: QueuedJob -> Int
249
calcJobPriority QueuedJob { qjOps = ops } =
250
  helper . map qoPriority $ filter (not . opStatusFinalized . qoStatus) ops
251
    where helper [] = C.opPrioDefault
252
          helper ps = minimum ps
253

    
254
-- | Log but ignore an 'IOError'.
255
ignoreIOError :: a -> Bool -> String -> IOError -> IO a
256
ignoreIOError a ignore_noent msg e = do
257
  unless (isDoesNotExistError e && ignore_noent) .
258
    logWarning $ msg ++ ": " ++ show e
259
  return a
260

    
261
-- | Compute the list of existing archive directories. Note that I/O
262
-- exceptions are swallowed and ignored.
263
allArchiveDirs :: FilePath -> IO [FilePath]
264
allArchiveDirs rootdir = do
265
  let adir = rootdir </> jobQueueArchiveSubDir
266
  contents <- getDirectoryContents adir `Control.Exception.catch`
267
               ignoreIOError [] False
268
                 ("Failed to list queue directory " ++ adir)
269
  let fpaths = map (adir </>) $ filter (not . ("." `isPrefixOf`)) contents
270
  filterM (\path ->
271
             liftM isDirectory (getFileStatus (adir </> path))
272
               `Control.Exception.catch`
273
               ignoreIOError False True
274
                 ("Failed to stat archive path " ++ path)) fpaths
275

    
276
-- | Build list of directories containing job files. Note: compared to
277
-- the Python version, this doesn't ignore a potential lost+found
278
-- file.
279
determineJobDirectories :: FilePath -> Bool -> IO [FilePath]
280
determineJobDirectories rootdir archived = do
281
  other <- if archived
282
             then allArchiveDirs rootdir
283
             else return []
284
  return $ rootdir:other
285

    
286
-- Function equivalent to the \'sequence\' function, that cannot be used because
287
-- of library version conflict on Lucid.
288
-- FIXME: delete this and just use \'sequence\' instead when Lucid compatibility
289
-- will not be required anymore.
290
sequencer :: [Either IOError [JobId]] -> Either IOError [[JobId]]
291
sequencer l = fmap reverse $ foldl seqFolder (Right []) l
292

    
293
-- | Folding function for joining multiple [JobIds] into one list.
294
seqFolder :: Either IOError [[JobId]]
295
          -> Either IOError [JobId]
296
          -> Either IOError [[JobId]]
297
seqFolder (Left e) _ = Left e
298
seqFolder (Right _) (Left e) = Left e
299
seqFolder (Right l) (Right el) = Right $ el:l
300

    
301
-- | Computes the list of all jobs in the given directories.
302
getJobIDs :: [FilePath] -> IO (Either IOError [JobId])
303
getJobIDs paths = liftM (fmap concat . sequencer) (mapM getDirJobIDs paths)
304

    
305
-- | Sorts the a list of job IDs.
306
sortJobIDs :: [JobId] -> [JobId]
307
sortJobIDs = sortBy (comparing fromJobId)
308

    
309
-- | Computes the list of jobs in a given directory.
310
getDirJobIDs :: FilePath -> IO (Either IOError [JobId])
311
getDirJobIDs path = do
312
  either_contents <-
313
    try (getDirectoryContents path) :: IO (Either IOError [FilePath])
314
  case either_contents of
315
    Left e -> do
316
      logWarning $ "Failed to list job directory " ++ path ++ ": " ++ show e
317
      return $ Left e
318
    Right contents -> do
319
      let jids = foldl (\ids file ->
320
                         case parseJobFileId file of
321
                           Nothing -> ids
322
                           Just new_id -> new_id:ids) [] contents
323
      return . Right $ reverse jids
324

    
325
-- | Reads the job data from disk.
326
readJobDataFromDisk :: FilePath -> Bool -> JobId -> IO (Maybe (String, Bool))
327
readJobDataFromDisk rootdir archived jid = do
328
  let live_path = liveJobFile rootdir jid
329
      archived_path = archivedJobFile rootdir jid
330
      all_paths = if archived
331
                    then [(live_path, False), (archived_path, True)]
332
                    else [(live_path, False)]
333
  foldM (\state (path, isarchived) ->
334
           liftM (\r -> Just (r, isarchived)) (readFile path)
335
             `Control.Exception.catch`
336
             ignoreIOError state True
337
               ("Failed to read job file " ++ path)) Nothing all_paths
338

    
339
-- | Failed to load job error.
340
noSuchJob :: Result (QueuedJob, Bool)
341
noSuchJob = Bad "Can't load job file"
342

    
343
-- | Loads a job from disk.
344
loadJobFromDisk :: FilePath -> Bool -> JobId -> IO (Result (QueuedJob, Bool))
345
loadJobFromDisk rootdir archived jid = do
346
  raw <- readJobDataFromDisk rootdir archived jid
347
  -- note: we need some stricness below, otherwise the wrapping in a
348
  -- Result will create too much lazyness, and not close the file
349
  -- descriptors for the individual jobs
350
  return $! case raw of
351
             Nothing -> noSuchJob
352
             Just (str, arch) ->
353
               liftM (\qj -> (qj, arch)) .
354
               fromJResult "Parsing job file" $ Text.JSON.decode str
355

    
356
-- | Write a job to disk.
357
writeJobToDisk :: FilePath -> QueuedJob -> IO (Result ())
358
writeJobToDisk rootdir job = do
359
  let filename = liveJobFile rootdir . qjId $ job
360
      content = Text.JSON.encode . Text.JSON.showJSON $ job
361
  tryAndLogIOError (atomicWriteFile filename content)
362
                   ("Failed to write " ++ filename) Ok
363

    
364
-- | Replicate a job to all master candidates.
365
replicateJob :: FilePath -> [Node] -> QueuedJob -> IO [(Node, ERpcError ())]
366
replicateJob rootdir mastercandidates job = do
367
  let filename = liveJobFile rootdir . qjId $ job
368
      content = Text.JSON.encode . Text.JSON.showJSON $ job
369
  result <- executeRpcCall mastercandidates
370
              $ RpcCallJobqueueUpdate filename content
371
  logRpcErrors result
372
  return result
373

    
374
-- | Replicate many jobs to all master candidates.
375
replicateManyJobs :: FilePath -> [Node] -> [QueuedJob] -> IO ()
376
replicateManyJobs rootdir mastercandidates =
377
  mapM_ (replicateJob rootdir mastercandidates)
378

    
379
-- | Read the job serial number from disk.
380
readSerialFromDisk :: IO (Result JobId)
381
readSerialFromDisk = do
382
  filename <- jobQueueSerialFile
383
  tryAndLogIOError (readFile filename) "Failed to read serial file"
384
                   (makeJobIdS . rStripSpace)
385

    
386
-- | Allocate new job ids.
387
-- To avoid races while accessing the serial file, the threads synchronize
388
-- over a lock, as usual provided by an MVar.
389
allocateJobIds :: [Node] -> MVar () -> Int -> IO (Result [JobId])
390
allocateJobIds mastercandidates lock n =
391
  if n <= 0
392
    then return . Bad $ "Can only allocate positive number of job ids"
393
    else do
394
      takeMVar lock
395
      rjobid <- readSerialFromDisk
396
      case rjobid of
397
        Bad s -> do
398
          putMVar lock ()
399
          return . Bad $ s
400
        Ok jid -> do
401
          let current = fromJobId jid
402
              serial_content = show (current + n) ++  "\n"
403
          serial <- jobQueueSerialFile
404
          write_result <- try $ atomicWriteFile serial serial_content
405
                          :: IO (Either IOError ())
406
          case write_result of
407
            Left e -> do
408
              putMVar lock ()
409
              let msg = "Failed to write serial file: " ++ show e
410
              logError msg
411
              return . Bad $ msg 
412
            Right () -> do
413
              _ <- executeRpcCall mastercandidates
414
                     $ RpcCallJobqueueUpdate serial serial_content
415
              putMVar lock ()
416
              return $ mapM makeJobId [(current+1)..(current+n)]
417

    
418
-- | Allocate one new job id.
419
allocateJobId :: [Node] -> MVar () -> IO (Result JobId)
420
allocateJobId mastercandidates lock = do
421
  jids <- allocateJobIds mastercandidates lock 1
422
  return (jids >>= monadicThe "Failed to allocate precisely one Job ID")
423

    
424
-- | Decide if job queue is open
425
isQueueOpen :: IO Bool
426
isQueueOpen = liftM not (jobQueueDrainFile >>= doesFileExist)
427

    
428
-- | Enqueue jobs. This will guarantee that jobs get executed eventually.
429
-- Curenntly, the implementation is to unconditionally hand over the job
430
-- to masterd.
431
enqueueJobs :: [QueuedJob] -> IO ()
432
enqueueJobs jobs = do
433
  socketpath <- defaultMasterSocket
434
  client <- getClient socketpath
435
  pickupResults <- mapM (flip callMethod client . PickupJob . qjId) jobs
436
  let failures = map show $ justBad pickupResults
437
  unless (null failures)
438
   . logWarning . (++) "Failed to notify masterd: " . commaJoin $ failures