Revision 48e4da5c

b/Makefile.am
689 689
	src/Ganeti/Hs2Py/GenOpCodes.hs \
690 690
	src/Ganeti/Hs2Py/OpDoc.hs \
691 691
	src/Ganeti/JQueue.hs \
692
	src/Ganeti/JQScheduler.hs \
692 693
	src/Ganeti/JSON.hs \
693 694
	src/Ganeti/Jobs.hs \
694 695
	src/Ganeti/Logging.hs \
b/src/Ganeti/Constants.hs
3857 3857
partReserved :: Double
3858 3858
partReserved = 0.02
3859 3859

  
3860
-- * Luxid job scheduling
3861

  
3862
-- | Time intervall in seconds for polling updates on the job queue. This
3863
-- intervall is only relevant if the number of running jobs reaches the maximal
3864
-- allowed number, as otherwise new jobs will be started immediately anyway.
3865
luxidJobqueuePollInterval :: Int
3866
luxidJobqueuePollInterval = 3
3867

  
3868
-- | Maximal number of jobs to be running at the same time. Once this number is
3869
-- reached, new jobs will just be queued and only started, once some of the
3870
-- other jobs have finished.
3871
luxidMaximalRunningJobs :: Int
3872
luxidMaximalRunningJobs = 20
3873

  
3860 3874
-- * Confd
3861 3875

  
3862 3876
confdProtocolVersion :: Int
b/src/Ganeti/JQScheduler.hs
1
{-| Implementation of a reader for the job queue.
2

  
3
-}
4

  
5
{-
6

  
7
Copyright (C) 2013 Google Inc.
8

  
9
This program is free software; you can redistribute it and/or modify
10
it under the terms of the GNU General Public License as published by
11
the Free Software Foundation; either version 2 of the License, or
12
(at your option) any later version.
13

  
14
This program is distributed in the hope that it will be useful, but
15
WITHOUT ANY WARRANTY; without even the implied warranty of
16
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17
General Public License for more details.
18

  
19
You should have received a copy of the GNU General Public License
20
along with this program; if not, write to the Free Software
21
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22
02110-1301, USA.
23

  
24
-}
25

  
26
module Ganeti.JQScheduler
27
  ( JQStatus
28
  , emptyJQStatus
29
  , initJQScheduler
30
  , enqueueNewJobs
31
  ) where
32

  
33
import Control.Arrow
34
import Control.Concurrent
35
import Control.Monad
36
import Data.List
37
import Data.IORef
38

  
39
import Ganeti.BasicTypes
40
import Ganeti.Constants as C
41
import Ganeti.JQueue as JQ
42
import Ganeti.Logging
43
import Ganeti.Path
44
import Ganeti.Types
45
import Ganeti.Utils
46

  
47
data JobWithStat = JobWithStat { jStat :: FStat, jJob :: QueuedJob }
48
data Queue = Queue { qEnqueued :: [JobWithStat], qRunning :: [JobWithStat] }
49

  
50
{-| Representation of the job queue
51

  
52
We keep two lists of jobs (together with information about the last
53
fstat result observed): the jobs that are enqueued, but not yet handed
54
over for execution, and the jobs already handed over for execution. They
55
are kept together in a single IORef, so that we can atomically update
56
both, in particular when scheduling jobs to be handed over for execution.
57

  
58
-}
59

  
60
data JQStatus = JQStatus
61
  { jqJobs :: IORef Queue
62
  }
63

  
64

  
65
emptyJQStatus :: IO JQStatus
66
emptyJQStatus = do
67
  jqJ <- newIORef Queue {qEnqueued=[], qRunning=[]}
68
  return JQStatus { jqJobs=jqJ }
69

  
70
-- | Apply a function on the running jobs.
71
onRunningJobs :: ([JobWithStat] -> [JobWithStat]) -> Queue -> Queue
72
onRunningJobs f queue = queue {qRunning=f $ qRunning queue}
73

  
74
-- | Apply a function on the queued jobs.
75
onQueuedJobs :: ([JobWithStat] -> [JobWithStat]) -> Queue -> Queue
76
onQueuedJobs f queue = queue {qEnqueued=f $ qEnqueued queue}
77

  
78
-- | Obtain a JobWithStat from a QueuedJob.
79
unreadJob :: QueuedJob -> JobWithStat
80
unreadJob job = JobWithStat {jJob=job, jStat=nullFStat}
81

  
82
-- | Reload interval for polling the running jobs for updates in microseconds.
83
watchInterval :: Int
84
watchInterval = C.luxidJobqueuePollInterval * 1000000 
85

  
86
-- | Maximal number of jobs to be running at the same time.
87
maxRunningJobs :: Int
88
maxRunningJobs = C.luxidMaximalRunningJobs 
89

  
90
-- | Wrapper function to atomically update the jobs in the queue status.
91
modifyJobs :: JQStatus -> (Queue -> Queue) -> IO ()
92
modifyJobs qstat f = atomicModifyIORef (jqJobs qstat) (flip (,) ()  . f)
93

  
94
-- | Reread a job from disk, if the file has changed.
95
readJobStatus :: JobWithStat -> IO (Maybe JobWithStat)
96
readJobStatus (JobWithStat {jStat=fstat, jJob=job})  = do
97
  let jid = qjId job
98
  qdir <- queueDir
99
  let fpath = liveJobFile qdir jid
100
  logDebug $ "Checking if " ++ fpath ++ " changed on disk."
101
  changed <- needsReload fstat fpath
102
  case changed of
103
    Nothing -> do
104
      logDebug $ "File " ++ fpath ++ " not changed on disk."
105
      return Nothing
106
    Just fstat' -> do
107
      logInfo $ "Rereading " ++ fpath
108
      readResult <- loadJobFromDisk qdir False jid
109
      let jids = show $ fromJobId jid
110
      case readResult of
111
        Bad s -> do
112
          logWarning $ "Failed to read job " ++ jids ++ ": " ++ s
113
          return Nothing
114
        Ok (job', _) -> do
115
          logDebug
116
            $ "Read job " ++ jids ++ ", staus is " ++ show (calcJobStatus job')
117
          return . Just $ JobWithStat {jStat=fstat', jJob=job'}
118

  
119
-- | Update a job in the job queue, if it is still there. This is the
120
-- pure function for inserting a previously read change into the queue.
121
-- as the change contains its time stamp, we don't have to worry about a
122
-- later read change overwriting a newer read state. If this happens, the
123
-- fstat value will be outdated, so the next poller run will fix this.
124
updateJobStatus :: JobWithStat -> [JobWithStat] -> [JobWithStat]
125
updateJobStatus job' =
126
  let jid = qjId $ jJob job' in
127
  map (\job -> if qjId (jJob job) == jid then job' else job)
128

  
129
-- | Update a single job by reading it from disk, if necessary.
130
updateJob :: JQStatus -> JobWithStat -> IO ()
131
updateJob state jb = do
132
  jb' <- readJobStatus jb
133
  maybe (return ()) (modifyJobs state . onRunningJobs . updateJobStatus) jb'
134

  
135
-- | Sort out the finished jobs from the monitored part of the queue.
136
-- This is the pure part, splitting the queue into a remaining queue
137
-- and the jobs that were removed.
138
sortoutFinishedJobs :: Queue -> (Queue, [QueuedJob])
139
sortoutFinishedJobs queue =
140
  let (run', fin) = partition
141
                      ((<= JOB_STATUS_RUNNING) . calcJobStatus . jJob)
142
                      . qRunning $ queue
143
  in (queue {qRunning=run'}, map jJob fin)
144

  
145
-- | Actually clean up the finished jobs. This is the IO wrapper around
146
-- the pure `sortoutFinishedJobs`.
147
cleanupFinishedJobs :: JQStatus -> IO ()
148
cleanupFinishedJobs qstate = do
149
  finished <- atomicModifyIORef (jqJobs qstate) sortoutFinishedJobs
150
  let showJob = show . ((fromJobId . qjId) &&& calcJobStatus)
151
      jlist = commaJoin $ map showJob finished
152
  unless (null finished)
153
    . logInfo $ "Finished jobs: " ++ jlist
154

  
155
-- | Decide on which jobs to schedule next for execution. This is the
156
-- pure function doing the scheduling.
157
selectJobsToRun :: Queue -> (Queue, [QueuedJob])
158
selectJobsToRun queue =
159
  let n = maxRunningJobs - length (qRunning queue)
160
      (chosen, remain) = splitAt n (qEnqueued queue)
161
  in (queue {qEnqueued=remain, qRunning=qRunning queue ++ chosen}
162
     , map jJob chosen)
163

  
164
-- | Schedule jobs to be run. This is the IO wrapper around the
165
-- pure `selectJobsToRun`.
166
scheduleSomeJobs :: JQStatus -> IO ()
167
scheduleSomeJobs qstate = do
168
  chosen <- atomicModifyIORef (jqJobs qstate) selectJobsToRun
169
  unless (null chosen) . logInfo . (++) "Staring jobs: " . commaJoin
170
    $ map (show . fromJobId . qjId) chosen
171
  JQ.startJobs chosen
172

  
173
-- | Format the job queue status in a compact, human readable way.
174
showQueue :: Queue -> String
175
showQueue (Queue {qEnqueued=waiting, qRunning=running}) =
176
  let showids = show . map (fromJobId . qjId . jJob)
177
  in "Waiting jobs: " ++ showids waiting 
178
       ++ "; running jobs: " ++ showids running
179

  
180
-- | Time-based watcher for updating the job queue.
181
onTimeWatcher :: JQStatus -> IO ()
182
onTimeWatcher qstate = forever $ do
183
  threadDelay watchInterval
184
  logDebug "Watcher timer fired"
185
  jobs <- readIORef (jqJobs qstate)
186
  mapM_ (updateJob qstate) $ qRunning jobs
187
  cleanupFinishedJobs qstate
188
  jobs' <- readIORef (jqJobs qstate)
189
  logInfo $ showQueue jobs'
190
  scheduleSomeJobs qstate
191

  
192
-- | Set up the job scheduler. This will also start the monitoring
193
-- of changes to the running jobs.
194
initJQScheduler :: JQStatus -> IO ()
195
initJQScheduler qstate = do
196
  logInfo "Starting time-based job queue watcher"
197
  _ <- forkIO $ onTimeWatcher qstate
198
  return ()
199

  
200
-- | Enqueue new jobs. This will guarantee that the jobs will be executed
201
-- eventually.
202
enqueueNewJobs :: JQStatus -> [QueuedJob] -> IO ()
203
enqueueNewJobs state jobs = do
204
  logInfo . (++) "New jobs enqueued: " . commaJoin
205
    $ map (show . fromJobId . qjId) jobs
206
  let jobs' = map unreadJob jobs
207
  modifyJobs state (onQueuedJobs (++ jobs'))
208
  scheduleSomeJobs state

Also available in: Unified diff