Revision a2977f53 src/Ganeti/JQScheduler.hs

b/src/Ganeti/JQScheduler.hs
143 143
-- | Sort out the finished jobs from the monitored part of the queue.
144 144
-- This is the pure part, splitting the queue into a remaining queue
145 145
-- and the jobs that were removed.
146
sortoutFinishedJobs :: Queue -> (Queue, [QueuedJob])
146
sortoutFinishedJobs :: Queue -> (Queue, [JobWithStat])
147 147
sortoutFinishedJobs queue =
148 148
  let (fin, run') = partition (jobFinalized . jJob) . qRunning $ queue
149
  in (queue {qRunning=run'}, map jJob fin)
149
  in (queue {qRunning=run'}, fin)
150 150

  
151 151
-- | Actually clean up the finished jobs. This is the IO wrapper around
152 152
-- the pure `sortoutFinishedJobs`.
153 153
cleanupFinishedJobs :: JQStatus -> IO ()
154 154
cleanupFinishedJobs qstate = do
155 155
  finished <- atomicModifyIORef (jqJobs qstate) sortoutFinishedJobs
156
  let showJob = show . ((fromJobId . qjId) &&& calcJobStatus)
156
  let showJob = show . ((fromJobId . qjId) &&& calcJobStatus) . jJob
157 157
      jlist = commaJoin $ map showJob finished
158 158
  unless (null finished)
159 159
    . logInfo $ "Finished jobs: " ++ jlist
......
161 161

  
162 162
-- | Decide on which jobs to schedule next for execution. This is the
163 163
-- pure function doing the scheduling.
164
selectJobsToRun :: Queue -> (Queue, [QueuedJob])
164
selectJobsToRun :: Queue -> (Queue, [JobWithStat])
165 165
selectJobsToRun queue =
166 166
  let n = maxRunningJobs - length (qRunning queue)
167 167
      (chosen, remain) = splitAt n (qEnqueued queue)
168
  in (queue {qEnqueued=remain, qRunning=qRunning queue ++ chosen}
169
     , map jJob chosen)
168
  in (queue {qEnqueued=remain, qRunning=qRunning queue ++ chosen}, chosen)
170 169

  
171 170
-- | Requeue jobs that were previously selected for execution
172 171
-- but couldn't be started.
173
requeueJobs :: JQStatus -> [QueuedJob] -> IOError -> IO ()
172
requeueJobs :: JQStatus -> [JobWithStat] -> IOError -> IO ()
174 173
requeueJobs qstate jobs err = do
175
  let jids = map qjId jobs
174
  let jids = map (qjId . jJob) jobs
176 175
      jidsString = commaJoin $ map (show . fromJobId) jids
177 176
      rmJobs = filter ((`notElem` jids) . qjId . jJob)
178 177
  logWarning $ "Starting jobs failed: " ++ show err
179 178
  logWarning $ "Rescheduling jobs: " ++ jidsString
180 179
  modifyJobs qstate (onRunningJobs rmJobs)
181
  modifyJobs qstate (onQueuedJobs . (++) $ map unreadJob jobs)
180
  modifyJobs qstate (onQueuedJobs $ (++) jobs)
182 181

  
183 182
-- | Schedule jobs to be run. This is the IO wrapper around the
184 183
-- pure `selectJobsToRun`.
185 184
scheduleSomeJobs :: JQStatus -> IO ()
186 185
scheduleSomeJobs qstate = do
187 186
  chosen <- atomicModifyIORef (jqJobs qstate) selectJobsToRun
187
  let jobs = map jJob chosen
188 188
  unless (null chosen) . logInfo . (++) "Starting jobs: " . commaJoin
189
    $ map (show . fromJobId . qjId) chosen
190
  result <- try $ JQ.startJobs chosen
189
    $ map (show . fromJobId . qjId) jobs
190
  result <- try $ JQ.startJobs jobs
191 191
  either (requeueJobs qstate chosen) return result
192 192

  
193 193
-- | Format the job queue status in a compact, human readable way.

Also available in: Unified diff