176 |
176 |
"""Object which runs OpCodes"""
|
177 |
177 |
DISPATCH_TABLE = _ComputeDispatchTable()
|
178 |
178 |
|
179 |
|
def __init__(self, context, ec_id, enable_locks=True):
|
|
179 |
def __init__(self, context, ec_id):
|
180 |
180 |
"""Constructor for Processor
|
181 |
181 |
|
182 |
182 |
@type context: GanetiContext
|
... | ... | |
190 |
190 |
self._cbs = None
|
191 |
191 |
self.rpc = rpc.RpcRunner(context.cfg)
|
192 |
192 |
self.hmclass = HooksMaster
|
193 |
|
self._enable_locks = enable_locks
|
194 |
|
|
195 |
|
def _CheckLocksEnabled(self):
|
196 |
|
"""Checks if locking is enabled.
|
197 |
|
|
198 |
|
@raise errors.ProgrammerError: In case locking is not enabled
|
199 |
|
|
200 |
|
"""
|
201 |
|
if not self._enable_locks:
|
202 |
|
raise errors.ProgrammerError("Attempted to use disabled locks")
|
203 |
193 |
|
204 |
194 |
def _AcquireLocks(self, level, names, shared, timeout, priority):
|
205 |
195 |
"""Acquires locks via the Ganeti lock manager.
|
... | ... | |
216 |
206 |
amount of time
|
217 |
207 |
|
218 |
208 |
"""
|
219 |
|
self._CheckLocksEnabled()
|
220 |
|
|
221 |
209 |
if self._cbs:
|
222 |
210 |
self._cbs.CheckCancel()
|
223 |
211 |
|
... | ... | |
303 |
291 |
" others")
|
304 |
292 |
|
305 |
293 |
elif adding_locks or acquiring_locks:
|
306 |
|
self._CheckLocksEnabled()
|
307 |
|
|
308 |
294 |
lu.DeclareLocks(level)
|
309 |
295 |
share = lu.share_locks[level]
|
310 |
296 |
|
... | ... | |
375 |
361 |
|
376 |
362 |
self._cbs = cbs
|
377 |
363 |
try:
|
378 |
|
if self._enable_locks:
|
379 |
|
# Acquire the Big Ganeti Lock exclusively if this LU requires it,
|
380 |
|
# and in a shared fashion otherwise (to prevent concurrent run with
|
381 |
|
# an exclusive LU.
|
382 |
|
self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
|
383 |
|
not lu_class.REQ_BGL, calc_timeout(),
|
384 |
|
priority)
|
385 |
|
elif lu_class.REQ_BGL:
|
386 |
|
raise errors.ProgrammerError("Opcode '%s' requires BGL, but locks are"
|
387 |
|
" disabled" % op.OP_ID)
|
388 |
|
|
|
364 |
# Acquire the Big Ganeti Lock exclusively if this LU requires it,
|
|
365 |
# and in a shared fashion otherwise (to prevent concurrent run with
|
|
366 |
# an exclusive LU.
|
|
367 |
self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
|
|
368 |
not lu_class.REQ_BGL, calc_timeout(),
|
|
369 |
priority)
|
389 |
370 |
try:
|
390 |
371 |
lu = lu_class(self, op, self.context, self.rpc)
|
391 |
372 |
lu.ExpandNames()
|
... | ... | |
398 |
379 |
if self._ec_id:
|
399 |
380 |
self.context.cfg.DropECReservations(self._ec_id)
|
400 |
381 |
finally:
|
401 |
|
# Release BGL if owned
|
402 |
|
if self.context.glm.is_owned(locking.LEVEL_CLUSTER):
|
403 |
|
assert self._enable_locks
|
404 |
|
self.context.glm.release(locking.LEVEL_CLUSTER)
|
|
382 |
self.context.glm.release(locking.LEVEL_CLUSTER)
|
405 |
383 |
finally:
|
406 |
384 |
self._cbs = None
|
407 |
385 |
|