Revision e92376d7
b/daemons/ganeti-masterd | ||
---|---|---|
261 | 261 |
""" |
262 | 262 |
proc = mcpu.Processor(self.server.context) |
263 | 263 |
# TODO: Where should log messages go? |
264 |
return proc.ExecOpCode(op, self._DummyLog) |
|
264 |
return proc.ExecOpCode(op, self._DummyLog, None)
|
|
265 | 265 |
|
266 | 266 |
|
267 | 267 |
class GanetiContext(object): |
b/lib/constants.py | ||
---|---|---|
304 | 304 |
|
305 | 305 |
# Job status |
306 | 306 |
JOB_STATUS_QUEUED = "queued" |
307 |
JOB_STATUS_WAITLOCK = "waiting" |
|
307 | 308 |
JOB_STATUS_RUNNING = "running" |
308 | 309 |
JOB_STATUS_CANCELED = "canceled" |
309 | 310 |
JOB_STATUS_SUCCESS = "success" |
310 | 311 |
JOB_STATUS_ERROR = "error" |
311 | 312 |
|
312 | 313 |
OP_STATUS_QUEUED = "queued" |
314 |
OP_STATUS_WAITLOCK = "waiting" |
|
313 | 315 |
OP_STATUS_RUNNING = "running" |
314 | 316 |
OP_STATUS_CANCELED = "canceled" |
315 | 317 |
OP_STATUS_SUCCESS = "success" |
b/lib/jqueue.py | ||
---|---|---|
159 | 159 |
|
160 | 160 |
if op.status == constants.OP_STATUS_QUEUED: |
161 | 161 |
pass |
162 |
elif op.status == constants.OP_STATUS_WAITLOCK: |
|
163 |
status = constants.JOB_STATUS_WAITLOCK |
|
162 | 164 |
elif op.status == constants.OP_STATUS_RUNNING: |
163 | 165 |
status = constants.JOB_STATUS_RUNNING |
164 | 166 |
elif op.status == constants.OP_STATUS_ERROR: |
... | ... | |
188 | 190 |
|
189 | 191 |
|
190 | 192 |
class _JobQueueWorker(workerpool.BaseWorker): |
193 |
def _NotifyStart(self): |
|
194 |
"""Mark the opcode as running, not lock-waiting. |
|
195 |
|
|
196 |
This is called from the mcpu code as a notifier function, when the |
|
197 |
LU is finally about to start the Exec() method. Of course, to have |
|
198 |
end-user visible results, the opcode must be initially (before |
|
199 |
calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK. |
|
200 |
|
|
201 |
""" |
|
202 |
assert self.queue, "Queue attribute is missing" |
|
203 |
assert self.opcode, "Opcode attribute is missing" |
|
204 |
|
|
205 |
self.queue.acquire() |
|
206 |
try: |
|
207 |
self.opcode.status = constants.OP_STATUS_RUNNING |
|
208 |
finally: |
|
209 |
self.queue.release() |
|
210 |
|
|
191 | 211 |
def RunTask(self, job): |
192 | 212 |
"""Job executor. |
193 | 213 |
|
... | ... | |
198 | 218 |
logging.debug("Worker %s processing job %s", |
199 | 219 |
self.worker_id, job.id) |
200 | 220 |
proc = mcpu.Processor(self.pool.queue.context) |
201 |
queue = job.queue |
|
221 |
self.queue = queue = job.queue
|
|
202 | 222 |
try: |
203 | 223 |
try: |
204 | 224 |
count = len(job.ops) |
... | ... | |
209 | 229 |
queue.acquire() |
210 | 230 |
try: |
211 | 231 |
job.run_op_index = idx |
212 |
op.status = constants.OP_STATUS_RUNNING
|
|
232 |
op.status = constants.OP_STATUS_WAITLOCK
|
|
213 | 233 |
op.result = None |
214 | 234 |
op.start_timestamp = TimeStampNow() |
215 | 235 |
if idx == 0: # first opcode |
... | ... | |
246 | 266 |
queue.release() |
247 | 267 |
|
248 | 268 |
# Make sure not to hold lock while _Log is called |
249 |
result = proc.ExecOpCode(input_opcode, _Log) |
|
269 |
self.opcode = op |
|
270 |
result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart) |
|
250 | 271 |
|
251 | 272 |
queue.acquire() |
252 | 273 |
try: |
... | ... | |
365 | 386 |
if status in (constants.JOB_STATUS_QUEUED, ): |
366 | 387 |
self._wpool.AddTask(job) |
367 | 388 |
|
368 |
elif status in (constants.JOB_STATUS_RUNNING, ): |
|
389 |
elif status in (constants.JOB_STATUS_RUNNING, |
|
390 |
constants.JOB_STATUS_WAITLOCK): |
|
369 | 391 |
logging.warning("Unfinished job %s found: %s", job.id, job) |
370 | 392 |
try: |
371 | 393 |
for op in job.ops: |
... | ... | |
621 | 643 |
log_entries = serializer.LoadJson(serializer.DumpJson(log_entries)) |
622 | 644 |
|
623 | 645 |
if status not in (constants.JOB_STATUS_QUEUED, |
624 |
constants.JOB_STATUS_RUNNING): |
|
646 |
constants.JOB_STATUS_RUNNING, |
|
647 |
constants.JOB_STATUS_WAITLOCK): |
|
625 | 648 |
# Don't even try to wait if the job is no longer running, there will be |
626 | 649 |
# no changes. |
627 | 650 |
break |
b/lib/mcpu.py | ||
---|---|---|
131 | 131 |
adding_locks = level in lu.add_locks |
132 | 132 |
acquiring_locks = level in lu.needed_locks |
133 | 133 |
if level not in locking.LEVELS: |
134 |
if callable(self._run_notifier): |
|
135 |
self._run_notifier() |
|
134 | 136 |
result = self._ExecLU(lu) |
135 | 137 |
elif adding_locks and acquiring_locks: |
136 | 138 |
# We could both acquire and add locks at the same level, but for now we |
... | ... | |
170 | 172 |
|
171 | 173 |
return result |
172 | 174 |
|
173 |
def ExecOpCode(self, op, feedback_fn): |
|
175 |
def ExecOpCode(self, op, feedback_fn, run_notifier):
|
|
174 | 176 |
"""Execute an opcode. |
175 | 177 |
|
176 |
Args: |
|
177 |
op: the opcode to be executed |
|
178 |
@type op: an OpCode instance |
|
179 |
@param op: the opcode to be executed |
|
180 |
@type feedback_fn: a function that takes a single argument |
|
181 |
@param feedback_fn: this function will be used as feedback from the LU |
|
182 |
code to the end-user |
|
183 |
@type run_notifier: callable (no arguments) or None |
|
184 |
@param run_notifier: this function (if callable) will be called when |
|
185 |
we are about to call the lu's Exec() method, that |
|
186 |
is, after we have aquired all locks |
|
178 | 187 |
|
179 | 188 |
""" |
180 | 189 |
if not isinstance(op, opcodes.OpCode): |
... | ... | |
182 | 191 |
" to ExecOpcode") |
183 | 192 |
|
184 | 193 |
self._feedback_fn = feedback_fn |
194 |
self._run_notifier = run_notifier |
|
185 | 195 |
lu_class = self.DISPATCH_TABLE.get(op.__class__, None) |
186 | 196 |
if lu_class is None: |
187 | 197 |
raise errors.OpCodeUnknown("Unknown opcode") |
b/scripts/gnt-job | ||
---|---|---|
38 | 38 |
|
39 | 39 |
_USER_JOB_STATUS = { |
40 | 40 |
constants.JOB_STATUS_QUEUED: "queued", |
41 |
constants.JOB_STATUS_WAITLOCK: "waiting", |
|
41 | 42 |
constants.JOB_STATUS_RUNNING: "running", |
42 | 43 |
constants.JOB_STATUS_CANCELED: "canceled", |
43 | 44 |
constants.JOB_STATUS_SUCCESS: "success", |
Also available in: Unified diff