Revision 19b9ba9a lib/locking.py
b/lib/locking.py | ||
---|---|---|
30 | 30 |
import threading |
31 | 31 |
import time |
32 | 32 |
import errno |
33 |
import weakref |
|
34 |
import logging |
|
33 | 35 |
|
34 | 36 |
from ganeti import errors |
35 | 37 |
from ganeti import utils |
... | ... | |
409 | 411 |
|
410 | 412 |
""" |
411 | 413 |
__slots__ = [ |
414 |
"__weakref__", |
|
412 | 415 |
"__active_shr_c", |
413 | 416 |
"__inactive_shr_c", |
414 | 417 |
"__deleted", |
... | ... | |
421 | 424 |
|
422 | 425 |
__condition_class = PipeCondition |
423 | 426 |
|
424 |
def __init__(self, name): |
|
427 |
def __init__(self, name, monitor=None):
|
|
425 | 428 |
"""Construct a new SharedLock. |
426 | 429 |
|
427 | 430 |
@param name: the name of the lock |
431 |
@type monitor: L{LockMonitor} |
|
432 |
@param monitor: Lock monitor with which to register |
|
428 | 433 |
|
429 | 434 |
""" |
430 | 435 |
object.__init__(self) |
... | ... | |
448 | 453 |
# is this lock in the deleted state? |
449 | 454 |
self.__deleted = False |
450 | 455 |
|
456 |
# Register with lock monitor |
|
457 |
if monitor: |
|
458 |
monitor.RegisterLock(self) |
|
459 |
|
|
460 |
def GetInfo(self, fields): |
|
461 |
"""Retrieves information for querying locks. |
|
462 |
|
|
463 |
@type fields: list of strings |
|
464 |
@param fields: List of fields to return |
|
465 |
|
|
466 |
""" |
|
467 |
self.__lock.acquire() |
|
468 |
try: |
|
469 |
info = [] |
|
470 |
|
|
471 |
# Note: to avoid unintentional race conditions, no references to |
|
472 |
# modifiable objects should be returned unless they were created in this |
|
473 |
# function. |
|
474 |
for fname in fields: |
|
475 |
if fname == "name": |
|
476 |
info.append(self.name) |
|
477 |
elif fname == "mode": |
|
478 |
if self.__deleted: |
|
479 |
info.append("deleted") |
|
480 |
assert not (self.__exc or self.__shr) |
|
481 |
elif self.__exc: |
|
482 |
info.append("exclusive") |
|
483 |
elif self.__shr: |
|
484 |
info.append("shared") |
|
485 |
else: |
|
486 |
info.append(None) |
|
487 |
elif fname == "owner": |
|
488 |
if self.__exc: |
|
489 |
owner = [self.__exc] |
|
490 |
else: |
|
491 |
owner = self.__shr |
|
492 |
|
|
493 |
if owner: |
|
494 |
assert not self.__deleted |
|
495 |
info.append([i.getName() for i in owner]) |
|
496 |
else: |
|
497 |
info.append(None) |
|
498 |
else: |
|
499 |
raise errors.OpExecError("Invalid query field '%s'" % fname) |
|
500 |
|
|
501 |
return info |
|
502 |
finally: |
|
503 |
self.__lock.release() |
|
504 |
|
|
451 | 505 |
def __check_deleted(self): |
452 | 506 |
"""Raises an exception if the lock has been deleted. |
453 | 507 |
|
... | ... | |
671 | 725 |
self.__deleted = True |
672 | 726 |
self.__exc = None |
673 | 727 |
|
728 |
assert not (self.__exc or self.__shr), "Found owner during deletion" |
|
729 |
|
|
674 | 730 |
# Notify all acquires. They'll throw an error. |
675 | 731 |
while self.__pending: |
676 | 732 |
self.__pending.pop().notifyAll() |
... | ... | |
713 | 769 |
@ivar name: the name of the lockset |
714 | 770 |
|
715 | 771 |
""" |
716 |
def __init__(self, members, name): |
|
772 |
def __init__(self, members, name, monitor=None):
|
|
717 | 773 |
"""Constructs a new LockSet. |
718 | 774 |
|
719 | 775 |
@type members: list of strings |
720 | 776 |
@param members: initial members of the set |
777 |
@type monitor: L{LockMonitor} |
|
778 |
@param monitor: Lock monitor with which to register member locks |
|
721 | 779 |
|
722 | 780 |
""" |
723 | 781 |
assert members is not None, "members parameter is not a list" |
724 | 782 |
self.name = name |
725 | 783 |
|
784 |
# Lock monitor |
|
785 |
self.__monitor = monitor |
|
786 |
|
|
726 | 787 |
# Used internally to guarantee coherency. |
727 | 788 |
self.__lock = SharedLock(name) |
728 | 789 |
|
... | ... | |
731 | 792 |
self.__lockdict = {} |
732 | 793 |
|
733 | 794 |
for mname in members: |
734 |
self.__lockdict[mname] = SharedLock(self._GetLockName(mname)) |
|
795 |
self.__lockdict[mname] = SharedLock(self._GetLockName(mname), |
|
796 |
monitor=monitor) |
|
735 | 797 |
|
736 | 798 |
# The owner dict contains the set of locks each thread owns. For |
737 | 799 |
# performance each thread can access its own key without a global lock on |
... | ... | |
1055 | 1117 |
(invalid_names, self.name)) |
1056 | 1118 |
|
1057 | 1119 |
for lockname in names: |
1058 |
lock = SharedLock(self._GetLockName(lockname)) |
|
1120 |
lock = SharedLock(self._GetLockName(lockname), monitor=self.__monitor)
|
|
1059 | 1121 |
|
1060 | 1122 |
if acquired: |
1061 | 1123 |
lock.acquire(shared=shared) |
... | ... | |
1193 | 1255 |
|
1194 | 1256 |
self.__class__._instance = self |
1195 | 1257 |
|
1258 |
self._monitor = LockMonitor() |
|
1259 |
|
|
1196 | 1260 |
# The keyring contains all the locks, at their level and in the correct |
1197 | 1261 |
# locking order. |
1198 | 1262 |
self.__keyring = { |
1199 |
LEVEL_CLUSTER: LockSet([BGL], "bgl lockset"), |
|
1200 |
LEVEL_NODE: LockSet(nodes, "nodes lockset"), |
|
1201 |
LEVEL_INSTANCE: LockSet(instances, "instances lockset"), |
|
1202 |
} |
|
1263 |
LEVEL_CLUSTER: LockSet([BGL], "BGL", monitor=self._monitor), |
|
1264 |
LEVEL_NODE: LockSet(nodes, "nodes", monitor=self._monitor), |
|
1265 |
LEVEL_INSTANCE: LockSet(instances, "instances", |
|
1266 |
monitor=self._monitor), |
|
1267 |
} |
|
1268 |
|
|
1269 |
def QueryLocks(self, fields, sync): |
|
1270 |
"""Queries information from all locks. |
|
1271 |
|
|
1272 |
See L{LockMonitor.QueryLocks}. |
|
1273 |
|
|
1274 |
""" |
|
1275 |
return self._monitor.QueryLocks(fields, sync) |
|
1203 | 1276 |
|
1204 | 1277 |
def _names(self, level): |
1205 | 1278 |
"""List the lock names at the given level. |
... | ... | |
1352 | 1425 |
"Cannot remove locks at a level while not owning it or" |
1353 | 1426 |
" owning some at a greater one") |
1354 | 1427 |
return self.__keyring[level].remove(names) |
1428 |
|
|
1429 |
|
|
1430 |
class LockMonitor(object): |
|
1431 |
_LOCK_ATTR = "_lock" |
|
1432 |
|
|
1433 |
def __init__(self): |
|
1434 |
"""Initializes this class. |
|
1435 |
|
|
1436 |
""" |
|
1437 |
self._lock = SharedLock("LockMonitor") |
|
1438 |
|
|
1439 |
# Tracked locks. Weak references are used to avoid issues with circular |
|
1440 |
# references and deletion. |
|
1441 |
self._locks = weakref.WeakKeyDictionary() |
|
1442 |
|
|
1443 |
@ssynchronized(_LOCK_ATTR) |
|
1444 |
def RegisterLock(self, lock): |
|
1445 |
"""Registers a new lock. |
|
1446 |
|
|
1447 |
""" |
|
1448 |
logging.debug("Registering lock %s", lock.name) |
|
1449 |
assert lock not in self._locks, "Duplicate lock registration" |
|
1450 |
assert not compat.any(lock.name == i.name for i in self._locks.keys()), \ |
|
1451 |
"Found duplicate lock name" |
|
1452 |
self._locks[lock] = None |
|
1453 |
|
|
1454 |
@ssynchronized(_LOCK_ATTR) |
|
1455 |
def _GetLockInfo(self, fields): |
|
1456 |
"""Get information from all locks while the monitor lock is held. |
|
1457 |
|
|
1458 |
""" |
|
1459 |
result = {} |
|
1460 |
|
|
1461 |
for lock in self._locks.keys(): |
|
1462 |
assert lock.name not in result, "Found duplicate lock name" |
|
1463 |
result[lock.name] = lock.GetInfo(fields) |
|
1464 |
|
|
1465 |
return result |
|
1466 |
|
|
1467 |
def QueryLocks(self, fields, sync): |
|
1468 |
"""Queries information from all locks. |
|
1469 |
|
|
1470 |
@type fields: list of strings |
|
1471 |
@param fields: List of fields to return |
|
1472 |
@type sync: boolean |
|
1473 |
@param sync: Whether to operate in synchronous mode |
|
1474 |
|
|
1475 |
""" |
|
1476 |
if sync: |
|
1477 |
raise NotImplementedError("Synchronous queries are not implemented") |
|
1478 |
|
|
1479 |
# Get all data without sorting |
|
1480 |
result = self._GetLockInfo(fields) |
|
1481 |
|
|
1482 |
# Sort by name |
|
1483 |
return [result[name] for name in utils.NiceSort(result.keys())] |
Also available in: Unified diff