Revision 031a3e57
b/daemons/ganeti-masterd | ||
---|---|---|
316 | 316 |
logging.info("Received invalid request '%s'", method) |
317 | 317 |
raise ValueError("Invalid operation '%s'" % method) |
318 | 318 |
|
319 |
def _DummyLog(self, *args): |
|
320 |
pass |
|
321 |
|
|
322 | 319 |
def _Query(self, op): |
323 | 320 |
"""Runs the specified opcode and returns the result. |
324 | 321 |
|
325 | 322 |
""" |
326 | 323 |
proc = mcpu.Processor(self.server.context) |
327 |
# TODO: Where should log messages go? |
|
328 |
return proc.ExecOpCode(op, self._DummyLog, None) |
|
324 |
return proc.ExecOpCode(op, None) |
|
329 | 325 |
|
330 | 326 |
|
331 | 327 |
class GanetiContext(object): |
b/lib/jqueue.py | ||
---|---|---|
334 | 334 |
not_marked = False |
335 | 335 |
|
336 | 336 |
|
337 |
class _JobQueueWorker(workerpool.BaseWorker): |
|
338 |
"""The actual job workers. |
|
337 |
class _OpCodeExecCallbacks(mcpu.OpExecCbBase): |
|
338 |
def __init__(self, queue, job, op): |
|
339 |
"""Initializes this class. |
|
339 | 340 |
|
340 |
""" |
|
341 |
def _NotifyStart(self): |
|
341 |
@type queue: L{JobQueue} |
|
342 |
@param queue: Job queue |
|
343 |
@type job: L{_QueuedJob} |
|
344 |
@param job: Job object |
|
345 |
@type op: L{_QueuedOpCode} |
|
346 |
@param op: OpCode |
|
347 |
|
|
348 |
""" |
|
349 |
assert queue, "Queue is missing" |
|
350 |
assert job, "Job is missing" |
|
351 |
assert op, "Opcode is missing" |
|
352 |
|
|
353 |
self._queue = queue |
|
354 |
self._job = job |
|
355 |
self._op = op |
|
356 |
|
|
357 |
def NotifyStart(self): |
|
342 | 358 |
"""Mark the opcode as running, not lock-waiting. |
343 | 359 |
|
344 |
This is called from the mcpu code as a notifier function, when the |
|
345 |
LU is finally about to start the Exec() method. Of course, to have
|
|
346 |
end-user visible results, the opcode must be initially (before
|
|
347 |
calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
|
|
360 |
This is called from the mcpu code as a notifier function, when the LU is
|
|
361 |
finally about to start the Exec() method. Of course, to have end-user
|
|
362 |
visible results, the opcode must be initially (before calling into
|
|
363 |
Processor.ExecOpCode) set to OP_STATUS_WAITLOCK. |
|
348 | 364 |
|
349 | 365 |
""" |
350 |
assert self.queue, "Queue attribute is missing" |
|
351 |
assert self.opcode, "Opcode attribute is missing" |
|
352 |
|
|
353 |
self.queue.acquire() |
|
366 |
self._queue.acquire() |
|
354 | 367 |
try: |
355 |
assert self.opcode.status in (constants.OP_STATUS_WAITLOCK,
|
|
356 |
constants.OP_STATUS_CANCELING)
|
|
368 |
assert self._op.status in (constants.OP_STATUS_WAITLOCK,
|
|
369 |
constants.OP_STATUS_CANCELING) |
|
357 | 370 |
|
358 | 371 |
# Cancel here if we were asked to |
359 |
if self.opcode.status == constants.OP_STATUS_CANCELING:
|
|
372 |
if self._op.status == constants.OP_STATUS_CANCELING:
|
|
360 | 373 |
raise CancelJob() |
361 | 374 |
|
362 |
self.opcode.status = constants.OP_STATUS_RUNNING
|
|
375 |
self._op.status = constants.OP_STATUS_RUNNING
|
|
363 | 376 |
finally: |
364 |
self.queue.release() |
|
377 |
self._queue.release() |
|
378 |
|
|
379 |
def Feedback(self, *args): |
|
380 |
"""Append a log entry. |
|
381 |
|
|
382 |
""" |
|
383 |
assert len(args) < 3 |
|
365 | 384 |
|
385 |
if len(args) == 1: |
|
386 |
log_type = constants.ELOG_MESSAGE |
|
387 |
log_msg = args[0] |
|
388 |
else: |
|
389 |
(log_type, log_msg) = args |
|
390 |
|
|
391 |
# The time is split to make serialization easier and not lose |
|
392 |
# precision. |
|
393 |
timestamp = utils.SplitTime(time.time()) |
|
394 |
|
|
395 |
self._queue.acquire() |
|
396 |
try: |
|
397 |
self._job.log_serial += 1 |
|
398 |
self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg)) |
|
399 |
|
|
400 |
self._job.change.notifyAll() |
|
401 |
finally: |
|
402 |
self._queue.release() |
|
403 |
|
|
404 |
|
|
405 |
class _JobQueueWorker(workerpool.BaseWorker): |
|
406 |
"""The actual job workers. |
|
407 |
|
|
408 |
""" |
|
366 | 409 |
def RunTask(self, job): |
367 | 410 |
"""Job executor. |
368 | 411 |
|
... | ... | |
376 | 419 |
logging.info("Worker %s processing job %s", |
377 | 420 |
self.worker_id, job.id) |
378 | 421 |
proc = mcpu.Processor(self.pool.queue.context) |
379 |
self.queue = queue = job.queue
|
|
422 |
queue = job.queue |
|
380 | 423 |
try: |
381 | 424 |
try: |
382 | 425 |
count = len(job.ops) |
... | ... | |
412 | 455 |
finally: |
413 | 456 |
queue.release() |
414 | 457 |
|
415 |
def _Log(*args): |
|
416 |
"""Append a log entry. |
|
417 |
|
|
418 |
""" |
|
419 |
assert len(args) < 3 |
|
420 |
|
|
421 |
if len(args) == 1: |
|
422 |
log_type = constants.ELOG_MESSAGE |
|
423 |
log_msg = args[0] |
|
424 |
else: |
|
425 |
log_type, log_msg = args |
|
426 |
|
|
427 |
# The time is split to make serialization easier and not lose |
|
428 |
# precision. |
|
429 |
timestamp = utils.SplitTime(time.time()) |
|
430 |
|
|
431 |
queue.acquire() |
|
432 |
try: |
|
433 |
job.log_serial += 1 |
|
434 |
op.log.append((job.log_serial, timestamp, log_type, log_msg)) |
|
435 |
|
|
436 |
job.change.notifyAll() |
|
437 |
finally: |
|
438 |
queue.release() |
|
439 |
|
|
440 |
# Make sure not to hold lock while _Log is called |
|
441 |
self.opcode = op |
|
442 |
result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart) |
|
458 |
# Make sure not to hold queue lock while calling ExecOpCode |
|
459 |
result = proc.ExecOpCode(input_opcode, |
|
460 |
_OpCodeExecCallbacks(queue, job, op)) |
|
443 | 461 |
|
444 | 462 |
queue.acquire() |
445 | 463 |
try: |
b/lib/mcpu.py | ||
---|---|---|
38 | 38 |
from ganeti import locking |
39 | 39 |
|
40 | 40 |
|
41 |
class OpExecCbBase: |
|
42 |
"""Base class for OpCode execution callbacks. |
|
43 |
|
|
44 |
""" |
|
45 |
def NotifyStart(self): |
|
46 |
"""Called when we are about to execute the LU. |
|
47 |
|
|
48 |
This function is called when we're about to start the lu's Exec() method, |
|
49 |
that is, after we have acquired all locks. |
|
50 |
|
|
51 |
""" |
|
52 |
|
|
53 |
def Feedback(self, *args): |
|
54 |
"""Sends feedback from the LU code to the end-user. |
|
55 |
|
|
56 |
""" |
|
57 |
|
|
58 |
|
|
41 | 59 |
class Processor(object): |
42 | 60 |
"""Object which runs OpCodes""" |
43 | 61 |
DISPATCH_TABLE = { |
... | ... | |
103 | 121 |
def __init__(self, context): |
104 | 122 |
"""Constructor for Processor |
105 | 123 |
|
106 |
Args: |
|
107 |
- feedback_fn: the feedback function (taking one string) to be run when |
|
108 |
interesting events are happening |
|
109 | 124 |
""" |
110 | 125 |
self.context = context |
111 |
self._feedback_fn = None
|
|
126 |
self._cbs = None
|
|
112 | 127 |
self.exclusive_BGL = False |
113 | 128 |
self.rpc = rpc.RpcRunner(context.cfg) |
114 | 129 |
self.hmclass = HooksMaster |
... | ... | |
122 | 137 |
hm = HooksMaster(self.rpc.call_hooks_runner, lu) |
123 | 138 |
h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE) |
124 | 139 |
lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results, |
125 |
self._feedback_fn, None)
|
|
140 |
self._Feedback, None)
|
|
126 | 141 |
|
127 | 142 |
if getattr(lu.op, "dry_run", False): |
128 | 143 |
# in this mode, no post-hooks are run, and the config is not |
... | ... | |
133 | 148 |
return lu.dry_run_result |
134 | 149 |
|
135 | 150 |
try: |
136 |
result = lu.Exec(self._feedback_fn)
|
|
151 |
result = lu.Exec(self._Feedback)
|
|
137 | 152 |
h_results = hm.RunPhase(constants.HOOKS_PHASE_POST) |
138 | 153 |
result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results, |
139 |
self._feedback_fn, result)
|
|
154 |
self._Feedback, result)
|
|
140 | 155 |
finally: |
141 | 156 |
# FIXME: This needs locks if not lu_class.REQ_BGL |
142 | 157 |
if write_count != self.context.cfg.write_count: |
... | ... | |
155 | 170 |
adding_locks = level in lu.add_locks |
156 | 171 |
acquiring_locks = level in lu.needed_locks |
157 | 172 |
if level not in locking.LEVELS: |
158 |
if callable(self._run_notifier): |
|
159 |
self._run_notifier() |
|
173 |
if self._cbs: |
|
174 |
self._cbs.NotifyStart() |
|
175 |
|
|
160 | 176 |
result = self._ExecLU(lu) |
161 | 177 |
elif adding_locks and acquiring_locks: |
162 | 178 |
# We could both acquire and add locks at the same level, but for now we |
... | ... | |
196 | 212 |
|
197 | 213 |
return result |
198 | 214 |
|
199 |
def ExecOpCode(self, op, feedback_fn, run_notifier):
|
|
215 |
def ExecOpCode(self, op, cbs):
|
|
200 | 216 |
"""Execute an opcode. |
201 | 217 |
|
202 | 218 |
@type op: an OpCode instance |
203 | 219 |
@param op: the opcode to be executed |
204 |
@type feedback_fn: a function that takes a single argument |
|
205 |
@param feedback_fn: this function will be used as feedback from the LU |
|
206 |
code to the end-user |
|
207 |
@type run_notifier: callable (no arguments) or None |
|
208 |
@param run_notifier: this function (if callable) will be called when |
|
209 |
we are about to call the lu's Exec() method, that |
|
210 |
is, after we have acquired all locks |
|
220 |
@type cbs: L{OpExecCbBase} |
|
221 |
@param cbs: Runtime callbacks |
|
211 | 222 |
|
212 | 223 |
""" |
213 | 224 |
if not isinstance(op, opcodes.OpCode): |
214 | 225 |
raise errors.ProgrammerError("Non-opcode instance passed" |
215 | 226 |
" to ExecOpcode") |
216 | 227 |
|
217 |
self._feedback_fn = feedback_fn |
|
218 |
self._run_notifier = run_notifier |
|
219 |
lu_class = self.DISPATCH_TABLE.get(op.__class__, None) |
|
220 |
if lu_class is None: |
|
221 |
raise errors.OpCodeUnknown("Unknown opcode") |
|
222 |
|
|
223 |
# Acquire the Big Ganeti Lock exclusively if this LU requires it, and in a |
|
224 |
# shared fashion otherwise (to prevent concurrent run with an exclusive LU. |
|
225 |
self.context.glm.acquire(locking.LEVEL_CLUSTER, [locking.BGL], |
|
226 |
shared=not lu_class.REQ_BGL) |
|
228 |
self._cbs = cbs |
|
227 | 229 |
try: |
228 |
self.exclusive_BGL = lu_class.REQ_BGL |
|
229 |
lu = lu_class(self, op, self.context, self.rpc) |
|
230 |
lu.ExpandNames() |
|
231 |
assert lu.needed_locks is not None, "needed_locks not set by LU" |
|
232 |
result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE) |
|
230 |
lu_class = self.DISPATCH_TABLE.get(op.__class__, None) |
|
231 |
if lu_class is None: |
|
232 |
raise errors.OpCodeUnknown("Unknown opcode") |
|
233 |
|
|
234 |
# Acquire the Big Ganeti Lock exclusively if this LU requires it, and in a |
|
235 |
# shared fashion otherwise (to prevent concurrent run with an exclusive |
|
236 |
# LU. |
|
237 |
self.context.glm.acquire(locking.LEVEL_CLUSTER, [locking.BGL], |
|
238 |
shared=not lu_class.REQ_BGL) |
|
239 |
try: |
|
240 |
self.exclusive_BGL = lu_class.REQ_BGL |
|
241 |
lu = lu_class(self, op, self.context, self.rpc) |
|
242 |
lu.ExpandNames() |
|
243 |
assert lu.needed_locks is not None, "needed_locks not set by LU" |
|
244 |
result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE) |
|
245 |
finally: |
|
246 |
self.context.glm.release(locking.LEVEL_CLUSTER) |
|
247 |
self.exclusive_BGL = False |
|
233 | 248 |
finally: |
234 |
self.context.glm.release(locking.LEVEL_CLUSTER) |
|
235 |
self.exclusive_BGL = False |
|
249 |
self._cbs = None |
|
236 | 250 |
|
237 | 251 |
return result |
238 | 252 |
|
253 |
def _Feedback(self, *args): |
|
254 |
"""Forward call to feedback callback function. |
|
255 |
|
|
256 |
""" |
|
257 |
if self._cbs: |
|
258 |
self._cbs.Feedback(*args) |
|
259 |
|
|
239 | 260 |
def LogStep(self, current, total, message): |
240 | 261 |
"""Log a change in LU execution progress. |
241 | 262 |
|
242 | 263 |
""" |
243 | 264 |
logging.debug("Step %d/%d %s", current, total, message) |
244 |
self._feedback_fn("STEP %d/%d %s" % (current, total, message))
|
|
265 |
self._Feedback("STEP %d/%d %s" % (current, total, message))
|
|
245 | 266 |
|
246 | 267 |
def LogWarning(self, message, *args, **kwargs): |
247 | 268 |
"""Log a warning to the logs and the user. |
... | ... | |
258 | 279 |
message = message % tuple(args) |
259 | 280 |
if message: |
260 | 281 |
logging.warning(message) |
261 |
self._feedback_fn(" - WARNING: %s" % message)
|
|
282 |
self._Feedback(" - WARNING: %s" % message)
|
|
262 | 283 |
if "hint" in kwargs: |
263 |
self._feedback_fn(" Hint: %s" % kwargs["hint"])
|
|
284 |
self._Feedback(" Hint: %s" % kwargs["hint"])
|
|
264 | 285 |
|
265 | 286 |
def LogInfo(self, message, *args): |
266 | 287 |
"""Log an informational message to the logs and the user. |
... | ... | |
269 | 290 |
if args: |
270 | 291 |
message = message % tuple(args) |
271 | 292 |
logging.info(message) |
272 |
self._feedback_fn(" - INFO: %s" % message)
|
|
293 |
self._Feedback(" - INFO: %s" % message)
|
|
273 | 294 |
|
274 | 295 |
|
275 | 296 |
class HooksMaster(object): |
Also available in: Unified diff