Statistics
| Branch: | Tag: | Revision:

root / src / Ganeti / JQueue.hs @ 370f63be

History | View | Annotate | Download (17.6 kB)

1
{-# LANGUAGE TemplateHaskell #-}
2

    
3
{-| Implementation of the job queue.
4

    
5
-}
6

    
7
{-
8

    
9
Copyright (C) 2010, 2012 Google Inc.
10

    
11
This program is free software; you can redistribute it and/or modify
12
it under the terms of the GNU General Public License as published by
13
the Free Software Foundation; either version 2 of the License, or
14
(at your option) any later version.
15

    
16
This program is distributed in the hope that it will be useful, but
17
WITHOUT ANY WARRANTY; without even the implied warranty of
18
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19
General Public License for more details.
20

    
21
You should have received a copy of the GNU General Public License
22
along with this program; if not, write to the Free Software
23
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24
02110-1301, USA.
25

    
26
-}
27

    
28
module Ganeti.JQueue
29
    ( QueuedOpCode(..)
30
    , QueuedJob(..)
31
    , InputOpCode(..)
32
    , queuedOpCodeFromMetaOpCode
33
    , queuedJobFromOpCodes
34
    , cancelQueuedJob
35
    , Timestamp
36
    , noTimestamp
37
    , currentTimestamp
38
    , setReceivedTimestamp
39
    , opStatusFinalized
40
    , extractOpSummary
41
    , calcJobStatus
42
    , jobStarted
43
    , jobFinalized
44
    , jobArchivable
45
    , calcJobPriority
46
    , jobFileName
47
    , liveJobFile
48
    , archivedJobFile
49
    , determineJobDirectories
50
    , getJobIDs
51
    , sortJobIDs
52
    , loadJobFromDisk
53
    , noSuchJob
54
    , readSerialFromDisk
55
    , allocateJobIds
56
    , allocateJobId
57
    , writeJobToDisk
58
    , replicateManyJobs
59
    , isQueueOpen
60
    , startJobs
61
    , cancelJob
62
    ) where
63

    
64
import Control.Applicative (liftA2, (<|>))
65
import Control.Arrow (second)
66
import Control.Concurrent.MVar
67
import Control.Exception
68
import Control.Monad
69
import Data.Functor ((<$))
70
import Data.List
71
import Data.Maybe
72
import Data.Ord (comparing)
73
-- workaround what seems to be a bug in ghc 7.4's TH shadowing code
74
import Prelude hiding (id, log)
75
import System.Directory
76
import System.FilePath
77
import System.IO.Error (isDoesNotExistError)
78
import System.Posix.Files
79
import System.Time
80
import qualified Text.JSON
81
import Text.JSON.Types
82

    
83
import Ganeti.BasicTypes
84
import qualified Ganeti.Constants as C
85
import Ganeti.Errors (ErrorResult)
86
import Ganeti.JSON
87
import Ganeti.Logging
88
import Ganeti.Luxi
89
import Ganeti.Objects (Node)
90
import Ganeti.OpCodes
91
import Ganeti.Path
92
import Ganeti.Rpc (executeRpcCall, ERpcError, logRpcErrors,
93
                   RpcCallJobqueueUpdate(..))
94
import Ganeti.THH
95
import Ganeti.Types
96
import Ganeti.Utils
97
import Ganeti.VCluster (makeVirtualPath)
98

    
99
-- * Data types
100

    
101
-- | The ganeti queue timestamp type. It represents the time as the pair
102
-- of seconds since the epoch and microseconds since the beginning of the
103
-- second.
104
type Timestamp = (Int, Int)
105

    
106
-- | Missing timestamp type.
107
noTimestamp :: Timestamp
108
noTimestamp = (-1, -1)
109

    
110
-- | Get the current time in the job-queue timestamp format.
111
currentTimestamp :: IO Timestamp
112
currentTimestamp = do
113
  TOD ctime pico <- getClockTime
114
  return (fromIntegral ctime, fromIntegral $ pico `div` 1000000)
115

    
116
-- | An input opcode.
117
data InputOpCode = ValidOpCode MetaOpCode -- ^ OpCode was parsed successfully
118
                 | InvalidOpCode JSValue  -- ^ Invalid opcode
119
                   deriving (Show, Eq)
120

    
121
-- | JSON instance for 'InputOpCode', trying to parse it and if
122
-- failing, keeping the original JSValue.
123
instance Text.JSON.JSON InputOpCode where
124
  showJSON (ValidOpCode mo) = Text.JSON.showJSON mo
125
  showJSON (InvalidOpCode inv) = inv
126
  readJSON v = case Text.JSON.readJSON v of
127
                 Text.JSON.Error _ -> return $ InvalidOpCode v
128
                 Text.JSON.Ok mo -> return $ ValidOpCode mo
129

    
130
-- | Invalid opcode summary.
131
invalidOp :: String
132
invalidOp = "INVALID_OP"
133

    
134
-- | Tries to extract the opcode summary from an 'InputOpCode'. This
135
-- duplicates some functionality from the 'opSummary' function in
136
-- "Ganeti.OpCodes".
137
extractOpSummary :: InputOpCode -> String
138
extractOpSummary (ValidOpCode metaop) = opSummary $ metaOpCode metaop
139
extractOpSummary (InvalidOpCode (JSObject o)) =
140
  case fromObjWithDefault (fromJSObject o) "OP_ID" ("OP_" ++ invalidOp) of
141
    Just s -> drop 3 s -- drop the OP_ prefix
142
    Nothing -> invalidOp
143
extractOpSummary _ = invalidOp
144

    
145
$(buildObject "QueuedOpCode" "qo"
146
  [ simpleField "input"           [t| InputOpCode |]
147
  , simpleField "status"          [t| OpStatus    |]
148
  , simpleField "result"          [t| JSValue     |]
149
  , defaultField [| [] |] $
150
    simpleField "log"             [t| [(Int, Timestamp, ELogType, JSValue)] |]
151
  , simpleField "priority"        [t| Int         |]
152
  , optionalNullSerField $
153
    simpleField "start_timestamp" [t| Timestamp   |]
154
  , optionalNullSerField $
155
    simpleField "exec_timestamp"  [t| Timestamp   |]
156
  , optionalNullSerField $
157
    simpleField "end_timestamp"   [t| Timestamp   |]
158
  ])
159

    
160
$(buildObject "QueuedJob" "qj"
161
  [ simpleField "id"                 [t| JobId          |]
162
  , simpleField "ops"                [t| [QueuedOpCode] |]
163
  , optionalNullSerField $
164
    simpleField "received_timestamp" [t| Timestamp      |]
165
  , optionalNullSerField $
166
    simpleField "start_timestamp"    [t| Timestamp      |]
167
  , optionalNullSerField $
168
    simpleField "end_timestamp"      [t| Timestamp      |]
169
  ])
170

    
171
-- | Convenience function to obtain a QueuedOpCode from a MetaOpCode
172
queuedOpCodeFromMetaOpCode :: MetaOpCode -> QueuedOpCode
173
queuedOpCodeFromMetaOpCode op =
174
  QueuedOpCode { qoInput = ValidOpCode op
175
               , qoStatus = OP_STATUS_QUEUED
176
               , qoPriority = opSubmitPriorityToRaw . opPriority . metaParams
177
                              $ op
178
               , qoLog = []
179
               , qoResult = JSNull
180
               , qoStartTimestamp = Nothing
181
               , qoEndTimestamp = Nothing
182
               , qoExecTimestamp = Nothing
183
               }
184

    
185
-- | From a job-id and a list of op-codes create a job. This is
186
-- the pure part of job creation, as allocating a new job id
187
-- lives in IO.
188
queuedJobFromOpCodes :: (Monad m) => JobId -> [MetaOpCode] -> m QueuedJob
189
queuedJobFromOpCodes jobid ops = do
190
  ops' <- mapM (`resolveDependencies` jobid) ops
191
  return QueuedJob { qjId = jobid
192
                   , qjOps = map queuedOpCodeFromMetaOpCode ops'
193
                   , qjReceivedTimestamp = Nothing 
194
                   , qjStartTimestamp = Nothing
195
                   , qjEndTimestamp = Nothing
196
                   }
197

    
198
-- | Attach a received timestamp to a Queued Job.
199
setReceivedTimestamp :: Timestamp -> QueuedJob -> QueuedJob
200
setReceivedTimestamp ts job = job { qjReceivedTimestamp = Just ts }
201

    
202
-- | Set the state of a QueuedOpCode to canceled.
203
cancelOpCode :: Timestamp -> QueuedOpCode -> QueuedOpCode
204
cancelOpCode now op =
205
  op { qoStatus = OP_STATUS_CANCELED, qoEndTimestamp = Just now }
206

    
207
-- | Transform a QueuedJob that has not been started into its canceled form.
208
cancelQueuedJob :: Timestamp -> QueuedJob -> QueuedJob
209
cancelQueuedJob now job =
210
  let ops' = map (cancelOpCode now) $ qjOps job
211
  in job { qjOps = ops', qjEndTimestamp = Just now}
212

    
213
-- | Job file prefix.
214
jobFilePrefix :: String
215
jobFilePrefix = "job-"
216

    
217
-- | Computes the filename for a given job ID.
218
jobFileName :: JobId -> FilePath
219
jobFileName jid = jobFilePrefix ++ show (fromJobId jid)
220

    
221
-- | Parses a job ID from a file name.
222
parseJobFileId :: (Monad m) => FilePath -> m JobId
223
parseJobFileId path =
224
  case stripPrefix jobFilePrefix path of
225
    Nothing -> fail $ "Job file '" ++ path ++
226
                      "' doesn't have the correct prefix"
227
    Just suffix -> makeJobIdS suffix
228

    
229
-- | Computes the full path to a live job.
230
liveJobFile :: FilePath -> JobId -> FilePath
231
liveJobFile rootdir jid = rootdir </> jobFileName jid
232

    
233
-- | Computes the full path to an archives job. BROKEN.
234
archivedJobFile :: FilePath -> JobId -> FilePath
235
archivedJobFile rootdir jid =
236
  let subdir = show (fromJobId jid `div` C.jstoreJobsPerArchiveDirectory)
237
  in rootdir </> jobQueueArchiveSubDir </> subdir </> jobFileName jid
238

    
239
-- | Map from opcode status to job status.
240
opStatusToJob :: OpStatus -> JobStatus
241
opStatusToJob OP_STATUS_QUEUED    = JOB_STATUS_QUEUED
242
opStatusToJob OP_STATUS_WAITING   = JOB_STATUS_WAITING
243
opStatusToJob OP_STATUS_SUCCESS   = JOB_STATUS_SUCCESS
244
opStatusToJob OP_STATUS_RUNNING   = JOB_STATUS_RUNNING
245
opStatusToJob OP_STATUS_CANCELING = JOB_STATUS_CANCELING
246
opStatusToJob OP_STATUS_CANCELED  = JOB_STATUS_CANCELED
247
opStatusToJob OP_STATUS_ERROR     = JOB_STATUS_ERROR
248

    
249
-- | Computes a queued job's status.
250
calcJobStatus :: QueuedJob -> JobStatus
251
calcJobStatus QueuedJob { qjOps = ops } =
252
  extractOpSt (map qoStatus ops) JOB_STATUS_QUEUED True
253
    where
254
      terminalStatus OP_STATUS_ERROR     = True
255
      terminalStatus OP_STATUS_CANCELING = True
256
      terminalStatus OP_STATUS_CANCELED  = True
257
      terminalStatus _                   = False
258
      softStatus     OP_STATUS_SUCCESS   = True
259
      softStatus     OP_STATUS_QUEUED    = True
260
      softStatus     _                   = False
261
      extractOpSt [] _ True = JOB_STATUS_SUCCESS
262
      extractOpSt [] d False = d
263
      extractOpSt (x:xs) d old_all
264
           | terminalStatus x = opStatusToJob x -- abort recursion
265
           | softStatus x     = extractOpSt xs d new_all -- continue unchanged
266
           | otherwise        = extractOpSt xs (opStatusToJob x) new_all
267
           where new_all = x == OP_STATUS_SUCCESS && old_all
268

    
269
-- | Determine if a job has started
270
jobStarted :: QueuedJob -> Bool
271
jobStarted = (> JOB_STATUS_QUEUED) . calcJobStatus
272

    
273
-- | Determine if a job is finalised.
274
jobFinalized :: QueuedJob -> Bool
275
jobFinalized = (> JOB_STATUS_RUNNING) . calcJobStatus
276

    
277
-- | Determine if a job is finalized and its timestamp is before
278
-- a given time.
279
jobArchivable :: Timestamp -> QueuedJob -> Bool
280
jobArchivable ts = liftA2 (&&) jobFinalized 
281
  $ maybe False (< ts)
282
    .  liftA2 (<|>) qjEndTimestamp qjStartTimestamp
283

    
284
-- | Determine whether an opcode status is finalized.
285
opStatusFinalized :: OpStatus -> Bool
286
opStatusFinalized = (> OP_STATUS_RUNNING)
287

    
288
-- | Compute a job's priority.
289
calcJobPriority :: QueuedJob -> Int
290
calcJobPriority QueuedJob { qjOps = ops } =
291
  helper . map qoPriority $ filter (not . opStatusFinalized . qoStatus) ops
292
    where helper [] = C.opPrioDefault
293
          helper ps = minimum ps
294

    
295
-- | Log but ignore an 'IOError'.
296
ignoreIOError :: a -> Bool -> String -> IOError -> IO a
297
ignoreIOError a ignore_noent msg e = do
298
  unless (isDoesNotExistError e && ignore_noent) .
299
    logWarning $ msg ++ ": " ++ show e
300
  return a
301

    
302
-- | Compute the list of existing archive directories. Note that I/O
303
-- exceptions are swallowed and ignored.
304
allArchiveDirs :: FilePath -> IO [FilePath]
305
allArchiveDirs rootdir = do
306
  let adir = rootdir </> jobQueueArchiveSubDir
307
  contents <- getDirectoryContents adir `Control.Exception.catch`
308
               ignoreIOError [] False
309
                 ("Failed to list queue directory " ++ adir)
310
  let fpaths = map (adir </>) $ filter (not . ("." `isPrefixOf`)) contents
311
  filterM (\path ->
312
             liftM isDirectory (getFileStatus (adir </> path))
313
               `Control.Exception.catch`
314
               ignoreIOError False True
315
                 ("Failed to stat archive path " ++ path)) fpaths
316

    
317
-- | Build list of directories containing job files. Note: compared to
318
-- the Python version, this doesn't ignore a potential lost+found
319
-- file.
320
determineJobDirectories :: FilePath -> Bool -> IO [FilePath]
321
determineJobDirectories rootdir archived = do
322
  other <- if archived
323
             then allArchiveDirs rootdir
324
             else return []
325
  return $ rootdir:other
326

    
327
-- Function equivalent to the \'sequence\' function, that cannot be used because
328
-- of library version conflict on Lucid.
329
-- FIXME: delete this and just use \'sequence\' instead when Lucid compatibility
330
-- will not be required anymore.
331
sequencer :: [Either IOError [JobId]] -> Either IOError [[JobId]]
332
sequencer l = fmap reverse $ foldl seqFolder (Right []) l
333

    
334
-- | Folding function for joining multiple [JobIds] into one list.
335
seqFolder :: Either IOError [[JobId]]
336
          -> Either IOError [JobId]
337
          -> Either IOError [[JobId]]
338
seqFolder (Left e) _ = Left e
339
seqFolder (Right _) (Left e) = Left e
340
seqFolder (Right l) (Right el) = Right $ el:l
341

    
342
-- | Computes the list of all jobs in the given directories.
343
getJobIDs :: [FilePath] -> IO (Either IOError [JobId])
344
getJobIDs paths = liftM (fmap concat . sequencer) (mapM getDirJobIDs paths)
345

    
346
-- | Sorts the a list of job IDs.
347
sortJobIDs :: [JobId] -> [JobId]
348
sortJobIDs = sortBy (comparing fromJobId)
349

    
350
-- | Computes the list of jobs in a given directory.
351
getDirJobIDs :: FilePath -> IO (Either IOError [JobId])
352
getDirJobIDs path = do
353
  either_contents <-
354
    try (getDirectoryContents path) :: IO (Either IOError [FilePath])
355
  case either_contents of
356
    Left e -> do
357
      logWarning $ "Failed to list job directory " ++ path ++ ": " ++ show e
358
      return $ Left e
359
    Right contents -> do
360
      let jids = foldl (\ids file ->
361
                         case parseJobFileId file of
362
                           Nothing -> ids
363
                           Just new_id -> new_id:ids) [] contents
364
      return . Right $ reverse jids
365

    
366
-- | Reads the job data from disk.
367
readJobDataFromDisk :: FilePath -> Bool -> JobId -> IO (Maybe (String, Bool))
368
readJobDataFromDisk rootdir archived jid = do
369
  let live_path = liveJobFile rootdir jid
370
      archived_path = archivedJobFile rootdir jid
371
      all_paths = if archived
372
                    then [(live_path, False), (archived_path, True)]
373
                    else [(live_path, False)]
374
  foldM (\state (path, isarchived) ->
375
           liftM (\r -> Just (r, isarchived)) (readFile path)
376
             `Control.Exception.catch`
377
             ignoreIOError state True
378
               ("Failed to read job file " ++ path)) Nothing all_paths
379

    
380
-- | Failed to load job error.
381
noSuchJob :: Result (QueuedJob, Bool)
382
noSuchJob = Bad "Can't load job file"
383

    
384
-- | Loads a job from disk.
385
loadJobFromDisk :: FilePath -> Bool -> JobId -> IO (Result (QueuedJob, Bool))
386
loadJobFromDisk rootdir archived jid = do
387
  raw <- readJobDataFromDisk rootdir archived jid
388
  -- note: we need some stricness below, otherwise the wrapping in a
389
  -- Result will create too much lazyness, and not close the file
390
  -- descriptors for the individual jobs
391
  return $! case raw of
392
             Nothing -> noSuchJob
393
             Just (str, arch) ->
394
               liftM (\qj -> (qj, arch)) .
395
               fromJResult "Parsing job file" $ Text.JSON.decode str
396

    
397
-- | Write a job to disk.
398
writeJobToDisk :: FilePath -> QueuedJob -> IO (Result ())
399
writeJobToDisk rootdir job = do
400
  let filename = liveJobFile rootdir . qjId $ job
401
      content = Text.JSON.encode . Text.JSON.showJSON $ job
402
  tryAndLogIOError (atomicWriteFile filename content)
403
                   ("Failed to write " ++ filename) Ok
404

    
405
-- | Replicate a job to all master candidates.
406
replicateJob :: FilePath -> [Node] -> QueuedJob -> IO [(Node, ERpcError ())]
407
replicateJob rootdir mastercandidates job = do
408
  let filename = liveJobFile rootdir . qjId $ job
409
      content = Text.JSON.encode . Text.JSON.showJSON $ job
410
  filename' <- makeVirtualPath filename
411
  callresult <- executeRpcCall mastercandidates
412
                  $ RpcCallJobqueueUpdate filename' content
413
  let result = map (second (() <$)) callresult
414
  logRpcErrors result
415
  return result
416

    
417
-- | Replicate many jobs to all master candidates.
418
replicateManyJobs :: FilePath -> [Node] -> [QueuedJob] -> IO ()
419
replicateManyJobs rootdir mastercandidates =
420
  mapM_ (replicateJob rootdir mastercandidates)
421

    
422
-- | Read the job serial number from disk.
423
readSerialFromDisk :: IO (Result JobId)
424
readSerialFromDisk = do
425
  filename <- jobQueueSerialFile
426
  tryAndLogIOError (readFile filename) "Failed to read serial file"
427
                   (makeJobIdS . rStripSpace)
428

    
429
-- | Allocate new job ids.
430
-- To avoid races while accessing the serial file, the threads synchronize
431
-- over a lock, as usual provided by an MVar.
432
allocateJobIds :: [Node] -> MVar () -> Int -> IO (Result [JobId])
433
allocateJobIds mastercandidates lock n =
434
  if n <= 0
435
    then return . Bad $ "Can only allocate positive number of job ids"
436
    else do
437
      takeMVar lock
438
      rjobid <- readSerialFromDisk
439
      case rjobid of
440
        Bad s -> do
441
          putMVar lock ()
442
          return . Bad $ s
443
        Ok jid -> do
444
          let current = fromJobId jid
445
              serial_content = show (current + n) ++  "\n"
446
          serial <- jobQueueSerialFile
447
          write_result <- try $ atomicWriteFile serial serial_content
448
                          :: IO (Either IOError ())
449
          case write_result of
450
            Left e -> do
451
              putMVar lock ()
452
              let msg = "Failed to write serial file: " ++ show e
453
              logError msg
454
              return . Bad $ msg 
455
            Right () -> do
456
              serial' <- makeVirtualPath serial
457
              _ <- executeRpcCall mastercandidates
458
                     $ RpcCallJobqueueUpdate serial' serial_content
459
              putMVar lock ()
460
              return $ mapM makeJobId [(current+1)..(current+n)]
461

    
462
-- | Allocate one new job id.
463
allocateJobId :: [Node] -> MVar () -> IO (Result JobId)
464
allocateJobId mastercandidates lock = do
465
  jids <- allocateJobIds mastercandidates lock 1
466
  return (jids >>= monadicThe "Failed to allocate precisely one Job ID")
467

    
468
-- | Decide if job queue is open
469
isQueueOpen :: IO Bool
470
isQueueOpen = liftM not (jobQueueDrainFile >>= doesFileExist)
471

    
472
-- | Start enqueued jobs, currently by handing them over to masterd.
473
startJobs :: [QueuedJob] -> IO ()
474
startJobs jobs = do
475
  socketpath <- defaultMasterSocket
476
  client <- getLuxiClient socketpath
477
  pickupResults <- mapM (flip callMethod client . PickupJob . qjId) jobs
478
  let failures = map show $ justBad pickupResults
479
  unless (null failures)
480
   . logWarning . (++) "Failed to notify masterd: " . commaJoin $ failures
481

    
482
-- | Try to cancel a job that has already been handed over to execution,
483
-- currently by asking masterd to cancel it.
484
cancelJob :: JobId -> IO (ErrorResult JSValue)
485
cancelJob jid = do
486
  socketpath <- defaultMasterSocket
487
  client <- getLuxiClient socketpath
488
  callMethod (CancelJob jid) client