Statistics
| Branch: | Tag: | Revision:

root / src / Ganeti / JQueue.hs @ 77676415

History | View | Annotate | Download (17.3 kB)

1
{-# LANGUAGE TemplateHaskell #-}
2

    
3
{-| Implementation of the job queue.
4

    
5
-}
6

    
7
{-
8

    
9
Copyright (C) 2010, 2012 Google Inc.
10

    
11
This program is free software; you can redistribute it and/or modify
12
it under the terms of the GNU General Public License as published by
13
the Free Software Foundation; either version 2 of the License, or
14
(at your option) any later version.
15

    
16
This program is distributed in the hope that it will be useful, but
17
WITHOUT ANY WARRANTY; without even the implied warranty of
18
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19
General Public License for more details.
20

    
21
You should have received a copy of the GNU General Public License
22
along with this program; if not, write to the Free Software
23
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24
02110-1301, USA.
25

    
26
-}
27

    
28
module Ganeti.JQueue
29
    ( QueuedOpCode(..)
30
    , QueuedJob(..)
31
    , InputOpCode(..)
32
    , queuedOpCodeFromMetaOpCode
33
    , queuedJobFromOpCodes
34
    , cancelQueuedJob
35
    , Timestamp
36
    , noTimestamp
37
    , currentTimestamp
38
    , setReceivedTimestamp
39
    , opStatusFinalized
40
    , extractOpSummary
41
    , calcJobStatus
42
    , jobStarted
43
    , jobFinalized
44
    , calcJobPriority
45
    , jobFileName
46
    , liveJobFile
47
    , archivedJobFile
48
    , determineJobDirectories
49
    , getJobIDs
50
    , sortJobIDs
51
    , loadJobFromDisk
52
    , noSuchJob
53
    , readSerialFromDisk
54
    , allocateJobIds
55
    , allocateJobId
56
    , writeJobToDisk
57
    , replicateManyJobs
58
    , isQueueOpen
59
    , startJobs
60
    , cancelJob
61
    ) where
62

    
63
import Control.Arrow (second)
64
import Control.Concurrent.MVar
65
import Control.Exception
66
import Control.Monad
67
import Data.Functor ((<$))
68
import Data.List
69
import Data.Maybe
70
import Data.Ord (comparing)
71
-- workaround what seems to be a bug in ghc 7.4's TH shadowing code
72
import Prelude hiding (id, log)
73
import System.Directory
74
import System.FilePath
75
import System.IO.Error (isDoesNotExistError)
76
import System.Posix.Files
77
import System.Time
78
import qualified Text.JSON
79
import Text.JSON.Types
80

    
81
import Ganeti.BasicTypes
82
import qualified Ganeti.Constants as C
83
import Ganeti.Errors (ErrorResult)
84
import Ganeti.JSON
85
import Ganeti.Logging
86
import Ganeti.Luxi
87
import Ganeti.Objects (Node)
88
import Ganeti.OpCodes
89
import Ganeti.Path
90
import Ganeti.Rpc (executeRpcCall, ERpcError, logRpcErrors,
91
                   RpcCallJobqueueUpdate(..))
92
import Ganeti.THH
93
import Ganeti.Types
94
import Ganeti.Utils
95
import Ganeti.VCluster (makeVirtualPath)
96

    
97
-- * Data types
98

    
99
-- | The ganeti queue timestamp type. It represents the time as the pair
100
-- of seconds since the epoch and microseconds since the beginning of the
101
-- second.
102
type Timestamp = (Int, Int)
103

    
104
-- | Missing timestamp type.
105
noTimestamp :: Timestamp
106
noTimestamp = (-1, -1)
107

    
108
-- | Get the current time in the job-queue timestamp format.
109
currentTimestamp :: IO Timestamp
110
currentTimestamp = do
111
  TOD ctime pico <- getClockTime
112
  return (fromIntegral ctime, fromIntegral $ pico `div` 1000000)
113

    
114
-- | An input opcode.
115
data InputOpCode = ValidOpCode MetaOpCode -- ^ OpCode was parsed successfully
116
                 | InvalidOpCode JSValue  -- ^ Invalid opcode
117
                   deriving (Show, Eq)
118

    
119
-- | JSON instance for 'InputOpCode', trying to parse it and if
120
-- failing, keeping the original JSValue.
121
instance Text.JSON.JSON InputOpCode where
122
  showJSON (ValidOpCode mo) = Text.JSON.showJSON mo
123
  showJSON (InvalidOpCode inv) = inv
124
  readJSON v = case Text.JSON.readJSON v of
125
                 Text.JSON.Error _ -> return $ InvalidOpCode v
126
                 Text.JSON.Ok mo -> return $ ValidOpCode mo
127

    
128
-- | Invalid opcode summary.
129
invalidOp :: String
130
invalidOp = "INVALID_OP"
131

    
132
-- | Tries to extract the opcode summary from an 'InputOpCode'. This
133
-- duplicates some functionality from the 'opSummary' function in
134
-- "Ganeti.OpCodes".
135
extractOpSummary :: InputOpCode -> String
136
extractOpSummary (ValidOpCode metaop) = opSummary $ metaOpCode metaop
137
extractOpSummary (InvalidOpCode (JSObject o)) =
138
  case fromObjWithDefault (fromJSObject o) "OP_ID" ("OP_" ++ invalidOp) of
139
    Just s -> drop 3 s -- drop the OP_ prefix
140
    Nothing -> invalidOp
141
extractOpSummary _ = invalidOp
142

    
143
$(buildObject "QueuedOpCode" "qo"
144
  [ simpleField "input"           [t| InputOpCode |]
145
  , simpleField "status"          [t| OpStatus    |]
146
  , simpleField "result"          [t| JSValue     |]
147
  , defaultField [| [] |] $
148
    simpleField "log"             [t| [(Int, Timestamp, ELogType, JSValue)] |]
149
  , simpleField "priority"        [t| Int         |]
150
  , optionalNullSerField $
151
    simpleField "start_timestamp" [t| Timestamp   |]
152
  , optionalNullSerField $
153
    simpleField "exec_timestamp"  [t| Timestamp   |]
154
  , optionalNullSerField $
155
    simpleField "end_timestamp"   [t| Timestamp   |]
156
  ])
157

    
158
$(buildObject "QueuedJob" "qj"
159
  [ simpleField "id"                 [t| JobId          |]
160
  , simpleField "ops"                [t| [QueuedOpCode] |]
161
  , optionalNullSerField $
162
    simpleField "received_timestamp" [t| Timestamp      |]
163
  , optionalNullSerField $
164
    simpleField "start_timestamp"    [t| Timestamp      |]
165
  , optionalNullSerField $
166
    simpleField "end_timestamp"      [t| Timestamp      |]
167
  ])
168

    
169
-- | Convenience function to obtain a QueuedOpCode from a MetaOpCode
170
queuedOpCodeFromMetaOpCode :: MetaOpCode -> QueuedOpCode
171
queuedOpCodeFromMetaOpCode op =
172
  QueuedOpCode { qoInput = ValidOpCode op
173
               , qoStatus = OP_STATUS_QUEUED
174
               , qoPriority = opSubmitPriorityToRaw . opPriority . metaParams
175
                              $ op
176
               , qoLog = []
177
               , qoResult = JSNull
178
               , qoStartTimestamp = Nothing
179
               , qoEndTimestamp = Nothing
180
               , qoExecTimestamp = Nothing
181
               }
182

    
183
-- | From a job-id and a list of op-codes create a job. This is
184
-- the pure part of job creation, as allocating a new job id
185
-- lives in IO.
186
queuedJobFromOpCodes :: (Monad m) => JobId -> [MetaOpCode] -> m QueuedJob
187
queuedJobFromOpCodes jobid ops = do
188
  ops' <- mapM (`resolveDependencies` jobid) ops
189
  return QueuedJob { qjId = jobid
190
                   , qjOps = map queuedOpCodeFromMetaOpCode ops'
191
                   , qjReceivedTimestamp = Nothing 
192
                   , qjStartTimestamp = Nothing
193
                   , qjEndTimestamp = Nothing
194
                   }
195

    
196
-- | Attach a received timestamp to a Queued Job.
197
setReceivedTimestamp :: Timestamp -> QueuedJob -> QueuedJob
198
setReceivedTimestamp ts job = job { qjReceivedTimestamp = Just ts }
199

    
200
-- | Set the state of a QueuedOpCode to canceled.
201
cancelOpCode :: Timestamp -> QueuedOpCode -> QueuedOpCode
202
cancelOpCode now op =
203
  op { qoStatus = OP_STATUS_CANCELED, qoEndTimestamp = Just now }
204

    
205
-- | Transform a QueuedJob that has not been started into its canceled form.
206
cancelQueuedJob :: Timestamp -> QueuedJob -> QueuedJob
207
cancelQueuedJob now job =
208
  let ops' = map (cancelOpCode now) $ qjOps job
209
  in job { qjOps = ops', qjEndTimestamp = Just now}
210

    
211
-- | Job file prefix.
212
jobFilePrefix :: String
213
jobFilePrefix = "job-"
214

    
215
-- | Computes the filename for a given job ID.
216
jobFileName :: JobId -> FilePath
217
jobFileName jid = jobFilePrefix ++ show (fromJobId jid)
218

    
219
-- | Parses a job ID from a file name.
220
parseJobFileId :: (Monad m) => FilePath -> m JobId
221
parseJobFileId path =
222
  case stripPrefix jobFilePrefix path of
223
    Nothing -> fail $ "Job file '" ++ path ++
224
                      "' doesn't have the correct prefix"
225
    Just suffix -> makeJobIdS suffix
226

    
227
-- | Computes the full path to a live job.
228
liveJobFile :: FilePath -> JobId -> FilePath
229
liveJobFile rootdir jid = rootdir </> jobFileName jid
230

    
231
-- | Computes the full path to an archives job. BROKEN.
232
archivedJobFile :: FilePath -> JobId -> FilePath
233
archivedJobFile rootdir jid =
234
  let subdir = show (fromJobId jid `div` C.jstoreJobsPerArchiveDirectory)
235
  in rootdir </> jobQueueArchiveSubDir </> subdir </> jobFileName jid
236

    
237
-- | Map from opcode status to job status.
238
opStatusToJob :: OpStatus -> JobStatus
239
opStatusToJob OP_STATUS_QUEUED    = JOB_STATUS_QUEUED
240
opStatusToJob OP_STATUS_WAITING   = JOB_STATUS_WAITING
241
opStatusToJob OP_STATUS_SUCCESS   = JOB_STATUS_SUCCESS
242
opStatusToJob OP_STATUS_RUNNING   = JOB_STATUS_RUNNING
243
opStatusToJob OP_STATUS_CANCELING = JOB_STATUS_CANCELING
244
opStatusToJob OP_STATUS_CANCELED  = JOB_STATUS_CANCELED
245
opStatusToJob OP_STATUS_ERROR     = JOB_STATUS_ERROR
246

    
247
-- | Computes a queued job's status.
248
calcJobStatus :: QueuedJob -> JobStatus
249
calcJobStatus QueuedJob { qjOps = ops } =
250
  extractOpSt (map qoStatus ops) JOB_STATUS_QUEUED True
251
    where
252
      terminalStatus OP_STATUS_ERROR     = True
253
      terminalStatus OP_STATUS_CANCELING = True
254
      terminalStatus OP_STATUS_CANCELED  = True
255
      terminalStatus _                   = False
256
      softStatus     OP_STATUS_SUCCESS   = True
257
      softStatus     OP_STATUS_QUEUED    = True
258
      softStatus     _                   = False
259
      extractOpSt [] _ True = JOB_STATUS_SUCCESS
260
      extractOpSt [] d False = d
261
      extractOpSt (x:xs) d old_all
262
           | terminalStatus x = opStatusToJob x -- abort recursion
263
           | softStatus x     = extractOpSt xs d new_all -- continue unchanged
264
           | otherwise        = extractOpSt xs (opStatusToJob x) new_all
265
           where new_all = x == OP_STATUS_SUCCESS && old_all
266

    
267
-- | Determine if a job has started
268
jobStarted :: QueuedJob -> Bool
269
jobStarted = (> JOB_STATUS_QUEUED) . calcJobStatus
270

    
271
-- | Determine if a job is finalised.
272
jobFinalized :: QueuedJob -> Bool
273
jobFinalized = (> JOB_STATUS_RUNNING) . calcJobStatus
274

    
275
-- | Determine whether an opcode status is finalized.
276
opStatusFinalized :: OpStatus -> Bool
277
opStatusFinalized = (> OP_STATUS_RUNNING)
278

    
279
-- | Compute a job's priority.
280
calcJobPriority :: QueuedJob -> Int
281
calcJobPriority QueuedJob { qjOps = ops } =
282
  helper . map qoPriority $ filter (not . opStatusFinalized . qoStatus) ops
283
    where helper [] = C.opPrioDefault
284
          helper ps = minimum ps
285

    
286
-- | Log but ignore an 'IOError'.
287
ignoreIOError :: a -> Bool -> String -> IOError -> IO a
288
ignoreIOError a ignore_noent msg e = do
289
  unless (isDoesNotExistError e && ignore_noent) .
290
    logWarning $ msg ++ ": " ++ show e
291
  return a
292

    
293
-- | Compute the list of existing archive directories. Note that I/O
294
-- exceptions are swallowed and ignored.
295
allArchiveDirs :: FilePath -> IO [FilePath]
296
allArchiveDirs rootdir = do
297
  let adir = rootdir </> jobQueueArchiveSubDir
298
  contents <- getDirectoryContents adir `Control.Exception.catch`
299
               ignoreIOError [] False
300
                 ("Failed to list queue directory " ++ adir)
301
  let fpaths = map (adir </>) $ filter (not . ("." `isPrefixOf`)) contents
302
  filterM (\path ->
303
             liftM isDirectory (getFileStatus (adir </> path))
304
               `Control.Exception.catch`
305
               ignoreIOError False True
306
                 ("Failed to stat archive path " ++ path)) fpaths
307

    
308
-- | Build list of directories containing job files. Note: compared to
309
-- the Python version, this doesn't ignore a potential lost+found
310
-- file.
311
determineJobDirectories :: FilePath -> Bool -> IO [FilePath]
312
determineJobDirectories rootdir archived = do
313
  other <- if archived
314
             then allArchiveDirs rootdir
315
             else return []
316
  return $ rootdir:other
317

    
318
-- Function equivalent to the \'sequence\' function, that cannot be used because
319
-- of library version conflict on Lucid.
320
-- FIXME: delete this and just use \'sequence\' instead when Lucid compatibility
321
-- will not be required anymore.
322
sequencer :: [Either IOError [JobId]] -> Either IOError [[JobId]]
323
sequencer l = fmap reverse $ foldl seqFolder (Right []) l
324

    
325
-- | Folding function for joining multiple [JobIds] into one list.
326
seqFolder :: Either IOError [[JobId]]
327
          -> Either IOError [JobId]
328
          -> Either IOError [[JobId]]
329
seqFolder (Left e) _ = Left e
330
seqFolder (Right _) (Left e) = Left e
331
seqFolder (Right l) (Right el) = Right $ el:l
332

    
333
-- | Computes the list of all jobs in the given directories.
334
getJobIDs :: [FilePath] -> IO (Either IOError [JobId])
335
getJobIDs paths = liftM (fmap concat . sequencer) (mapM getDirJobIDs paths)
336

    
337
-- | Sorts the a list of job IDs.
338
sortJobIDs :: [JobId] -> [JobId]
339
sortJobIDs = sortBy (comparing fromJobId)
340

    
341
-- | Computes the list of jobs in a given directory.
342
getDirJobIDs :: FilePath -> IO (Either IOError [JobId])
343
getDirJobIDs path = do
344
  either_contents <-
345
    try (getDirectoryContents path) :: IO (Either IOError [FilePath])
346
  case either_contents of
347
    Left e -> do
348
      logWarning $ "Failed to list job directory " ++ path ++ ": " ++ show e
349
      return $ Left e
350
    Right contents -> do
351
      let jids = foldl (\ids file ->
352
                         case parseJobFileId file of
353
                           Nothing -> ids
354
                           Just new_id -> new_id:ids) [] contents
355
      return . Right $ reverse jids
356

    
357
-- | Reads the job data from disk.
358
readJobDataFromDisk :: FilePath -> Bool -> JobId -> IO (Maybe (String, Bool))
359
readJobDataFromDisk rootdir archived jid = do
360
  let live_path = liveJobFile rootdir jid
361
      archived_path = archivedJobFile rootdir jid
362
      all_paths = if archived
363
                    then [(live_path, False), (archived_path, True)]
364
                    else [(live_path, False)]
365
  foldM (\state (path, isarchived) ->
366
           liftM (\r -> Just (r, isarchived)) (readFile path)
367
             `Control.Exception.catch`
368
             ignoreIOError state True
369
               ("Failed to read job file " ++ path)) Nothing all_paths
370

    
371
-- | Failed to load job error.
372
noSuchJob :: Result (QueuedJob, Bool)
373
noSuchJob = Bad "Can't load job file"
374

    
375
-- | Loads a job from disk.
376
loadJobFromDisk :: FilePath -> Bool -> JobId -> IO (Result (QueuedJob, Bool))
377
loadJobFromDisk rootdir archived jid = do
378
  raw <- readJobDataFromDisk rootdir archived jid
379
  -- note: we need some stricness below, otherwise the wrapping in a
380
  -- Result will create too much lazyness, and not close the file
381
  -- descriptors for the individual jobs
382
  return $! case raw of
383
             Nothing -> noSuchJob
384
             Just (str, arch) ->
385
               liftM (\qj -> (qj, arch)) .
386
               fromJResult "Parsing job file" $ Text.JSON.decode str
387

    
388
-- | Write a job to disk.
389
writeJobToDisk :: FilePath -> QueuedJob -> IO (Result ())
390
writeJobToDisk rootdir job = do
391
  let filename = liveJobFile rootdir . qjId $ job
392
      content = Text.JSON.encode . Text.JSON.showJSON $ job
393
  tryAndLogIOError (atomicWriteFile filename content)
394
                   ("Failed to write " ++ filename) Ok
395

    
396
-- | Replicate a job to all master candidates.
397
replicateJob :: FilePath -> [Node] -> QueuedJob -> IO [(Node, ERpcError ())]
398
replicateJob rootdir mastercandidates job = do
399
  let filename = liveJobFile rootdir . qjId $ job
400
      content = Text.JSON.encode . Text.JSON.showJSON $ job
401
  filename' <- makeVirtualPath filename
402
  callresult <- executeRpcCall mastercandidates
403
                  $ RpcCallJobqueueUpdate filename' content
404
  let result = map (second (() <$)) callresult
405
  logRpcErrors result
406
  return result
407

    
408
-- | Replicate many jobs to all master candidates.
409
replicateManyJobs :: FilePath -> [Node] -> [QueuedJob] -> IO ()
410
replicateManyJobs rootdir mastercandidates =
411
  mapM_ (replicateJob rootdir mastercandidates)
412

    
413
-- | Read the job serial number from disk.
414
readSerialFromDisk :: IO (Result JobId)
415
readSerialFromDisk = do
416
  filename <- jobQueueSerialFile
417
  tryAndLogIOError (readFile filename) "Failed to read serial file"
418
                   (makeJobIdS . rStripSpace)
419

    
420
-- | Allocate new job ids.
421
-- To avoid races while accessing the serial file, the threads synchronize
422
-- over a lock, as usual provided by an MVar.
423
allocateJobIds :: [Node] -> MVar () -> Int -> IO (Result [JobId])
424
allocateJobIds mastercandidates lock n =
425
  if n <= 0
426
    then return . Bad $ "Can only allocate positive number of job ids"
427
    else do
428
      takeMVar lock
429
      rjobid <- readSerialFromDisk
430
      case rjobid of
431
        Bad s -> do
432
          putMVar lock ()
433
          return . Bad $ s
434
        Ok jid -> do
435
          let current = fromJobId jid
436
              serial_content = show (current + n) ++  "\n"
437
          serial <- jobQueueSerialFile
438
          write_result <- try $ atomicWriteFile serial serial_content
439
                          :: IO (Either IOError ())
440
          case write_result of
441
            Left e -> do
442
              putMVar lock ()
443
              let msg = "Failed to write serial file: " ++ show e
444
              logError msg
445
              return . Bad $ msg 
446
            Right () -> do
447
              serial' <- makeVirtualPath serial
448
              _ <- executeRpcCall mastercandidates
449
                     $ RpcCallJobqueueUpdate serial' serial_content
450
              putMVar lock ()
451
              return $ mapM makeJobId [(current+1)..(current+n)]
452

    
453
-- | Allocate one new job id.
454
allocateJobId :: [Node] -> MVar () -> IO (Result JobId)
455
allocateJobId mastercandidates lock = do
456
  jids <- allocateJobIds mastercandidates lock 1
457
  return (jids >>= monadicThe "Failed to allocate precisely one Job ID")
458

    
459
-- | Decide if job queue is open
460
isQueueOpen :: IO Bool
461
isQueueOpen = liftM not (jobQueueDrainFile >>= doesFileExist)
462

    
463
-- | Start enqueued jobs, currently by handing them over to masterd.
464
startJobs :: [QueuedJob] -> IO ()
465
startJobs jobs = do
466
  socketpath <- defaultMasterSocket
467
  client <- getLuxiClient socketpath
468
  pickupResults <- mapM (flip callMethod client . PickupJob . qjId) jobs
469
  let failures = map show $ justBad pickupResults
470
  unless (null failures)
471
   . logWarning . (++) "Failed to notify masterd: " . commaJoin $ failures
472

    
473
-- | Try to cancel a job that has already been handed over to execution,
474
-- currently by asking masterd to cancel it.
475
cancelJob :: JobId -> IO (ErrorResult JSValue)
476
cancelJob jid = do
477
  socketpath <- defaultMasterSocket
478
  client <- getLuxiClient socketpath
479
  callMethod (CancelJob jid) client