Revision 87ed6b79
b/lib/cmdlib/base.py | ||
---|---|---|
53 | 53 |
self.other = kwargs |
54 | 54 |
|
55 | 55 |
|
56 |
class LUWConfdClient(object): |
|
57 |
"""Wrapper class for wconfd client calls from LUs. |
|
58 |
|
|
59 |
Correctly updates the cache of the LU's owned locks |
|
60 |
when leaving. Also transparently adds the context |
|
61 |
for resource requests. |
|
62 |
|
|
63 |
""" |
|
64 |
def __init__(self, lu): |
|
65 |
self.lu = lu |
|
66 |
|
|
67 |
def TryUpdateLocks(self, req): |
|
68 |
jid, livelockfile = self.lu.wconfdcontext |
|
69 |
self.lu.wconfd.Client().TryUpdateLocks(jid, livelockfile, req) |
|
70 |
self.lu.wconfdlocks = self.lu.wconfd.Client().ListLocks(jid, livelockfile) |
|
71 |
|
|
72 |
def DownGradeLocksLevel(self, level): |
|
73 |
jid, livelockfile = self.lu.wconfdcontext |
|
74 |
self.lu.wconfd.Client().DownGradeLocksLevel(jid, livelockfile, level) |
|
75 |
self.lu.wconfdlocks = self.lu.wconfd.Client().ListLocks(jid, livelockfile) |
|
76 |
|
|
77 |
def FreeLocksLevel(self, level): |
|
78 |
jid, livelockfile = self.lu.wconfdcontext |
|
79 |
self.lu.wconfd.Client().FreeLocksLevel(jid, livelockfile, level) |
|
80 |
self.lu.wconfdlocks = self.lu.wconfd.Client().ListLocks(jid, livelockfile) |
|
81 |
|
|
82 |
|
|
56 | 83 |
class LogicalUnit(object): |
57 | 84 |
"""Logical Unit base class. |
58 | 85 |
|
... | ... | |
76 | 103 |
HTYPE = None |
77 | 104 |
REQ_BGL = True |
78 | 105 |
|
79 |
def __init__(self, processor, op, context, rpc_runner): |
|
106 |
def __init__(self, processor, op, context, rpc_runner, wconfdcontext, wconfd):
|
|
80 | 107 |
"""Constructor for LogicalUnit. |
81 | 108 |
|
82 | 109 |
This needs to be overridden in derived classes in order to check op |
83 | 110 |
validity. |
84 | 111 |
|
112 |
@type wconfdcontext: (int, string) |
|
113 |
@param wconfdcontext: the identity of the logical unit to represent itself |
|
114 |
to wconfd when asking for resources; it is given as job id and livelock |
|
115 |
file. |
|
116 |
@param wconfd: the wconfd class to use; dependency injection to allow |
|
117 |
testability. |
|
118 |
|
|
85 | 119 |
""" |
86 | 120 |
self.proc = processor |
87 | 121 |
self.op = op |
88 | 122 |
self.cfg = context.cfg |
89 |
self.glm = context.glm |
|
90 |
# readability alias |
|
91 |
self.owned_locks = context.glm.list_owned |
|
123 |
self.wconfdlocks = [] |
|
124 |
self.wconfdcontext = wconfdcontext |
|
92 | 125 |
self.context = context |
93 | 126 |
self.rpc = rpc_runner |
127 |
self.wconfd = wconfd # wconfd module to use, for testing |
|
94 | 128 |
|
95 | 129 |
# Dictionaries used to declare locking needs to mcpu |
96 | 130 |
self.needed_locks = None |
... | ... | |
123 | 157 |
|
124 | 158 |
self.CheckArguments() |
125 | 159 |
|
160 |
def WConfdClient(self): |
|
161 |
return LUWConfdClient(self) |
|
162 |
|
|
163 |
def owned_locks(self, level): |
|
164 |
"""Return the list of locks owned by the LU at a given level. |
|
165 |
|
|
166 |
This method assumes that is field wconfdlocks is set correctly |
|
167 |
by mcpu. |
|
168 |
|
|
169 |
""" |
|
170 |
levelprefix = "%s/" % (locking.LEVEL_NAMES[level],) |
|
171 |
locks = set([lock[0][len(levelprefix):] |
|
172 |
for lock in self.wconfdlocks |
|
173 |
if lock[0].startswith(levelprefix)]) |
|
174 |
expand_fns = { |
|
175 |
locking.LEVEL_CLUSTER: (lambda: [locking.BGL]), |
|
176 |
locking.LEVEL_INSTANCE: self.cfg.GetInstanceList, |
|
177 |
locking.LEVEL_NODE_ALLOC: (lambda: [locking.NAL]), |
|
178 |
locking.LEVEL_NODEGROUP: self.cfg.GetNodeGroupList, |
|
179 |
locking.LEVEL_NODE: self.cfg.GetNodeList, |
|
180 |
locking.LEVEL_NODE_RES: self.cfg.GetNodeList, |
|
181 |
locking.LEVEL_NETWORK: self.cfg.GetNetworkList, |
|
182 |
} |
|
183 |
if locking.LOCKSET_NAME in locks: |
|
184 |
return expand_fns[level]() |
|
185 |
else: |
|
186 |
return locks |
|
187 |
|
|
188 |
def release_request(self, level, names): |
|
189 |
"""Return a request to release the specified locks of the given level. |
|
190 |
|
|
191 |
Correctly break up the group lock to do so. |
|
192 |
|
|
193 |
""" |
|
194 |
levelprefix = "%s/" % (locking.LEVEL_NAMES[level],) |
|
195 |
release = [[levelprefix + lock, "release"] for lock in names] |
|
196 |
|
|
197 |
# if we break up the set-lock, make sure we ask for the rest of it. |
|
198 |
setlock = levelprefix + locking.LOCKSET_NAME |
|
199 |
if [setlock, "exclusive"] in self.wconfdlocks: |
|
200 |
owned = self.owned_locks(level) |
|
201 |
request = [[levelprefix + lock, "exclusive"] |
|
202 |
for lock in owned |
|
203 |
if lock not in names] |
|
204 |
elif [setlock, "shared"] in self.wconfdlocks: |
|
205 |
owned = self.owned_locks(level) |
|
206 |
request = [[levelprefix + lock, "shared"] |
|
207 |
for lock in owned |
|
208 |
if lock not in names] |
|
209 |
else: |
|
210 |
request = [] |
|
211 |
|
|
212 |
return release + [[setlock, "release"]] + request |
|
213 |
|
|
126 | 214 |
def CheckArguments(self): |
127 | 215 |
"""Check syntactic validity for the opcode arguments. |
128 | 216 |
|
... | ... | |
515 | 603 |
|
516 | 604 |
# caller specified names and we must keep the same order |
517 | 605 |
assert self.names |
518 |
assert not self.do_locking or lu.glm.is_owned(lock_level) |
|
519 | 606 |
|
520 | 607 |
missing = set(self.wanted).difference(names) |
521 | 608 |
if missing: |
b/lib/cmdlib/cluster.py | ||
---|---|---|
331 | 331 |
"""Computes the list of nodes and their attributes. |
332 | 332 |
|
333 | 333 |
""" |
334 |
# Locking is not used |
|
335 |
assert not (compat.any(lu.glm.is_owned(level) |
|
336 |
for level in locking.LEVELS |
|
337 |
if level != locking.LEVEL_CLUSTER) or |
|
338 |
self.do_locking or self.use_locking) |
|
339 |
|
|
340 | 334 |
if query.CQ_CONFIG in self.requested_data: |
341 | 335 |
cluster = lu.cfg.GetClusterInfo() |
342 | 336 |
nodes = lu.cfg.GetAllNodesInfo() |
... | ... | |
591 | 585 |
|
592 | 586 |
""" |
593 | 587 |
if self.wanted_names is None: |
594 |
self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE) |
|
588 |
self.wanted_names = \ |
|
589 |
map(self.cfg.GetInstanceName, |
|
590 |
self.owned_locks(locking.LEVEL_INSTANCE)) |
|
595 | 591 |
|
596 | 592 |
self.wanted_instances = \ |
597 | 593 |
map(compat.snd, self.cfg.GetMultiInstanceInfoByName(self.wanted_names)) |
... | ... | |
634 | 630 |
per_node_disks[pnode].append((instance, idx, disk)) |
635 | 631 |
|
636 | 632 |
assert not (frozenset(per_node_disks.keys()) - |
637 |
self.owned_locks(locking.LEVEL_NODE_RES)), \
|
|
633 |
frozenset(self.owned_locks(locking.LEVEL_NODE_RES))), \
|
|
638 | 634 |
"Not owning correct locks" |
639 | 635 |
assert not self.owned_locks(locking.LEVEL_NODE) |
640 | 636 |
|
b/lib/cmdlib/instance.py | ||
---|---|---|
1389 | 1389 |
assert not (self.owned_locks(locking.LEVEL_NODE_RES) - |
1390 | 1390 |
self.owned_locks(locking.LEVEL_NODE)), \ |
1391 | 1391 |
"Node locks differ from node resource locks" |
1392 |
assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC) |
|
1393 | 1392 |
|
1394 | 1393 |
ht_kind = self.op.hypervisor |
1395 | 1394 |
if ht_kind in constants.HTS_REQ_PORT: |
... | ... | |
1732 | 1731 |
# Otherwise the new lock would have to be added in acquired mode. |
1733 | 1732 |
assert self.REQ_BGL |
1734 | 1733 |
assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER) |
1735 |
self.glm.remove(locking.LEVEL_INSTANCE, old_name) |
|
1736 |
self.glm.add(locking.LEVEL_INSTANCE, self.op.new_name) |
|
1737 | 1734 |
|
1738 | 1735 |
# re-read the instance from the configuration after rename |
1739 | 1736 |
renamed_inst = self.cfg.GetInstanceInfo(self.instance.uuid) |
b/lib/cmdlib/instance_migration.py | ||
---|---|---|
347 | 347 |
ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC) |
348 | 348 |
|
349 | 349 |
else: |
350 |
assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC) |
|
351 |
|
|
352 | 350 |
secondary_node_uuids = self.instance.secondary_nodes |
353 | 351 |
if not secondary_node_uuids: |
354 | 352 |
raise errors.ConfigurationError("No secondary node but using" |
b/lib/cmdlib/instance_storage.py | ||
---|---|---|
784 | 784 |
ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.node_uuids) |
785 | 785 |
ReleaseLocks(self, locking.LEVEL_NODE_ALLOC) |
786 | 786 |
|
787 |
assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC) |
|
788 |
|
|
789 | 787 |
if self.op.node_uuids: |
790 | 788 |
node_uuids = self.op.node_uuids |
791 | 789 |
else: |
... | ... | |
1590 | 1588 |
ReleaseLocks(self, locking.LEVEL_NODE) |
1591 | 1589 |
|
1592 | 1590 |
# Downgrade lock while waiting for sync |
1593 |
self.glm.downgrade(locking.LEVEL_INSTANCE) |
|
1591 |
self.WConfdClient().DownGradeLocksLevel( |
|
1592 |
locking.LEVEL_NAMES[locking.LEVEL_INSTANCE]) |
|
1594 | 1593 |
|
1595 | 1594 |
assert wipe_disks ^ (old_disk_size is None) |
1596 | 1595 |
|
... | ... | |
1707 | 1706 |
for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP) |
1708 | 1707 |
for node_uuid in self.cfg.GetNodeGroup(group_uuid).members] |
1709 | 1708 |
else: |
1710 |
assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC) |
|
1711 |
|
|
1712 | 1709 |
self._LockInstancesNodes() |
1713 | 1710 |
|
1714 | 1711 |
elif level == locking.LEVEL_NODE_RES: |
... | ... | |
1748 | 1745 |
"""Check prerequisites. |
1749 | 1746 |
|
1750 | 1747 |
""" |
1751 |
assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or |
|
1752 |
self.op.iallocator is None) |
|
1753 |
|
|
1754 | 1748 |
# Verify if node group locks are still correct |
1755 | 1749 |
owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP) |
1756 | 1750 |
if owned_groups: |
... | ... | |
2170 | 2164 |
(owned_nodes, self.node_secondary_ip.keys())) |
2171 | 2165 |
assert (self.lu.owned_locks(locking.LEVEL_NODE) == |
2172 | 2166 |
self.lu.owned_locks(locking.LEVEL_NODE_RES)) |
2173 |
assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC) |
|
2174 | 2167 |
|
2175 | 2168 |
owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE) |
2176 | 2169 |
assert list(owned_instances) == [self.instance_name], \ |
2177 | 2170 |
"Instance '%s' not locked" % self.instance_name |
2178 | 2171 |
|
2179 |
assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \ |
|
2180 |
"Should not own any node group lock at this point" |
|
2181 |
|
|
2182 | 2172 |
if not self.disks: |
2183 | 2173 |
feedback_fn("No disks need replacement for instance '%s'" % |
2184 | 2174 |
self.instance.name) |
b/lib/cmdlib/instance_utils.py | ||
---|---|---|
378 | 378 |
@param keep: Names of locks to retain |
379 | 379 |
|
380 | 380 |
""" |
381 |
logging.debug("Lu %s ReleaseLocks %s names=%s, keep=%s", |
|
382 |
lu.wconfdcontext, level, names, keep) |
|
381 | 383 |
assert not (keep is not None and names is not None), \ |
382 | 384 |
"Only one of the 'names' and the 'keep' parameters can be given" |
383 | 385 |
|
... | ... | |
388 | 390 |
else: |
389 | 391 |
should_release = None |
390 | 392 |
|
393 |
levelname = locking.LEVEL_NAMES[level] |
|
394 |
|
|
391 | 395 |
owned = lu.owned_locks(level) |
392 | 396 |
if not owned: |
393 | 397 |
# Not owning any lock at this level, do nothing |
... | ... | |
407 | 411 |
assert len(lu.owned_locks(level)) == (len(retain) + len(release)) |
408 | 412 |
|
409 | 413 |
# Release just some locks |
410 |
lu.glm.release(level, names=release)
|
|
411 |
|
|
414 |
lu.WConfdClient().TryUpdateLocks(
|
|
415 |
lu.release_request(level, release)) |
|
412 | 416 |
assert frozenset(lu.owned_locks(level)) == frozenset(retain) |
413 | 417 |
else: |
414 |
# Release everything |
|
415 |
lu.glm.release(level) |
|
416 |
|
|
417 |
assert not lu.glm.is_owned(level), "No locks should be owned" |
|
418 |
lu.WConfdClient().FreeLocksLevel(levelname) |
|
418 | 419 |
|
419 | 420 |
|
420 | 421 |
def _ComputeIPolicyNodeViolation(ipolicy, instance, current_group, |
b/lib/cmdlib/misc.py | ||
---|---|---|
24 | 24 |
import logging |
25 | 25 |
import time |
26 | 26 |
|
27 |
from ganeti import compat |
|
28 | 27 |
from ganeti import constants |
29 | 28 |
from ganeti import errors |
30 | 29 |
from ganeti import locking |
... | ... | |
286 | 285 |
"""Computes the list of nodes and their attributes. |
287 | 286 |
|
288 | 287 |
""" |
289 |
# Locking is not used |
|
290 |
assert not (compat.any(lu.glm.is_owned(level) |
|
291 |
for level in locking.LEVELS |
|
292 |
if level != locking.LEVEL_CLUSTER) or |
|
293 |
self.do_locking or self.use_locking) |
|
294 |
|
|
295 | 288 |
valid_nodes = [node.uuid |
296 | 289 |
for node in lu.cfg.GetAllNodesInfo().values() |
297 | 290 |
if not node.offline and node.vm_capable] |
b/lib/cmdlib/operating_system.py | ||
---|---|---|
21 | 21 |
|
22 | 22 |
"""Logical units dealing with OS.""" |
23 | 23 |
|
24 |
from ganeti import compat |
|
25 | 24 |
from ganeti import locking |
26 | 25 |
from ganeti import qlang |
27 | 26 |
from ganeti import query |
... | ... | |
94 | 93 |
"""Computes the list of nodes and their attributes. |
95 | 94 |
|
96 | 95 |
""" |
97 |
# Locking is not used |
|
98 |
assert not (compat.any(lu.glm.is_owned(level) |
|
99 |
for level in locking.LEVELS |
|
100 |
if level != locking.LEVEL_CLUSTER) or |
|
101 |
self.do_locking or self.use_locking) |
|
102 |
|
|
103 | 96 |
valid_node_uuids = [node.uuid |
104 | 97 |
for node in lu.cfg.GetAllNodesInfo().values() |
105 | 98 |
if not node.offline and node.vm_capable] |
b/lib/locking.py | ||
---|---|---|
916 | 916 |
# to acquire. Hide this behind this nicely named constant. |
917 | 917 |
ALL_SET = None |
918 | 918 |
|
919 |
LOCKSET_NAME = "[lockset]" |
|
920 |
|
|
919 | 921 |
|
920 | 922 |
def _TimeoutZero(): |
921 | 923 |
"""Returns the number zero. |
b/lib/mcpu.py | ||
---|---|---|
44 | 44 |
from ganeti import locking |
45 | 45 |
from ganeti import utils |
46 | 46 |
from ganeti import compat |
47 |
from ganeti import wconfd |
|
47 | 48 |
|
48 | 49 |
|
49 | 50 |
_OP_PREFIX = "Op" |
... | ... | |
250 | 251 |
" queries) can not submit jobs") |
251 | 252 |
|
252 | 253 |
|
253 |
def _VerifyLocks(lu, glm, _mode_whitelist=_NODE_ALLOC_MODE_WHITELIST,
|
|
254 |
def _VerifyLocks(lu, _mode_whitelist=_NODE_ALLOC_MODE_WHITELIST, |
|
254 | 255 |
_nal_whitelist=_NODE_ALLOC_WHITELIST): |
255 | 256 |
"""Performs consistency checks on locks acquired by a logical unit. |
256 | 257 |
|
257 | 258 |
@type lu: L{cmdlib.LogicalUnit} |
258 | 259 |
@param lu: Logical unit instance |
259 |
@type glm: L{locking.GanetiLockManager} |
|
260 |
@param glm: Lock manager |
|
261 | 260 |
|
262 | 261 |
""" |
263 | 262 |
if not __debug__: |
264 | 263 |
return |
265 | 264 |
|
266 |
have_nal = glm.check_owned(locking.LEVEL_NODE_ALLOC, locking.NAL) |
|
265 |
allocset = lu.owned_locks(locking.LEVEL_NODE_ALLOC) |
|
266 |
have_nal = locking.NAL in allocset |
|
267 | 267 |
|
268 | 268 |
for level in [locking.LEVEL_NODE, locking.LEVEL_NODE_RES]: |
269 | 269 |
# TODO: Verify using actual lock mode, not using LU variables |
... | ... | |
282 | 282 |
if lu.__class__ in _nal_whitelist: |
283 | 283 |
assert not have_nal, \ |
284 | 284 |
"LU is whitelisted for not acquiring the node allocation lock" |
285 |
elif lu.needed_locks[level] == locking.ALL_SET or glm.owning_all(level):
|
|
285 |
elif lu.needed_locks[level] == locking.ALL_SET: |
|
286 | 286 |
assert have_nal, \ |
287 | 287 |
("Node allocation lock must be used if an LU acquires all nodes" |
288 | 288 |
" or node resources") |
... | ... | |
307 | 307 |
self.rpc = context.rpc |
308 | 308 |
self.hmclass = hooksmaster.HooksMaster |
309 | 309 |
self._enable_locks = enable_locks |
310 |
self.wconfd = wconfd # Indirection to allow testing |
|
310 | 311 |
|
311 | 312 |
def _CheckLocksEnabled(self): |
312 | 313 |
"""Checks if locking is enabled. |
... | ... | |
336 | 337 |
""" |
337 | 338 |
self._CheckLocksEnabled() |
338 | 339 |
|
340 |
# TODO: honor priority in lock allocation |
|
339 | 341 |
if self._cbs: |
340 |
priority = self._cbs.CurrentPriority() |
|
342 |
priority = self._cbs.CurrentPriority() # pylint: disable=W0612
|
|
341 | 343 |
else: |
342 | 344 |
priority = None |
343 | 345 |
|
344 |
acquired = self.context.glm.acquire(level, names, shared=shared, |
|
345 |
timeout=timeout, priority=priority, |
|
346 |
opportunistic=opportunistic) |
|
346 |
if names == locking.ALL_SET: |
|
347 |
if opportunistic: |
|
348 |
expand_fns = { |
|
349 |
locking.LEVEL_CLUSTER: (lambda: [locking.BGL]), |
|
350 |
locking.LEVEL_INSTANCE: self.context.cfg.GetInstanceList, |
|
351 |
locking.LEVEL_NODE_ALLOC: (lambda: [locking.NAL]), |
|
352 |
locking.LEVEL_NODEGROUP: self.context.cfg.GetNodeGroupList, |
|
353 |
locking.LEVEL_NODE: self.context.cfg.GetNodeList, |
|
354 |
locking.LEVEL_NODE_RES: self.context.cfg.GetNodeList, |
|
355 |
locking.LEVEL_NETWORK: self.context.cfg.GetNetworkList, |
|
356 |
} |
|
357 |
names = expand_fns[level]() |
|
358 |
else: |
|
359 |
names = locking.LOCKSET_NAME |
|
360 |
|
|
361 |
if isinstance(names, str): |
|
362 |
names = [names] |
|
363 |
|
|
364 |
levelname = locking.LEVEL_NAMES[level] |
|
365 |
jid = int(self.GetECId()) |
|
366 |
livelockfile = self.context.livelock.lockfile.name |
|
367 |
|
|
368 |
locks = ["%s/%s" % (levelname, lock) for lock in list(names)] |
|
347 | 369 |
|
348 |
if acquired is None: |
|
349 |
raise LockAcquireTimeout() |
|
370 |
if not names: |
|
371 |
logging.debug("Acquiring no locks for %d (%s) at level %s", |
|
372 |
jid, livelockfile, levelname) |
|
373 |
return [] |
|
350 | 374 |
|
351 |
return acquired |
|
375 |
if shared: |
|
376 |
request = [[lock, "shared"] for lock in locks] |
|
377 |
else: |
|
378 |
request = [[lock, "exclusive"] for lock in locks] |
|
379 |
|
|
380 |
if opportunistic: |
|
381 |
logging.debug("Opportunistically acquring some of %s for %d (%s).", |
|
382 |
locks, jid, livelockfile) |
|
383 |
locks = self.wconfd.Client().OpportunisticLockUnion(jid, livelockfile, |
|
384 |
request) |
|
385 |
elif timeout is None: |
|
386 |
while True: |
|
387 |
## TODO: use asynchronous wait instead of polling |
|
388 |
blockedon = self.wconfd.Client().TryUpdateLocks(jid, livelockfile, |
|
389 |
request) |
|
390 |
logging.debug("Requesting %s for %d (%s) blocked on %s", |
|
391 |
request, jid, livelockfile, blockedon) |
|
392 |
if not blockedon: |
|
393 |
break |
|
394 |
time.sleep(random.random()) |
|
395 |
else: |
|
396 |
logging.debug("Trying %ss to request %s for %d (%s)", |
|
397 |
timeout, request, jid, livelockfile) |
|
398 |
## TODO: use blocking wait instead of polling |
|
399 |
blocked = utils.SimpleRetry([], self.wconfd.Client().TryUpdateLocks, 0.1, |
|
400 |
timeout, args=[jid, livelockfile, request]) |
|
401 |
if blocked: |
|
402 |
raise LockAcquireTimeout() |
|
403 |
|
|
404 |
return locks |
|
352 | 405 |
|
353 | 406 |
def _ExecLU(self, lu): |
354 | 407 |
"""Logical Unit execution sequence. |
... | ... | |
398 | 451 |
given LU and its opcodes. |
399 | 452 |
|
400 | 453 |
""" |
401 |
glm = self.context.glm |
|
402 | 454 |
adding_locks = level in lu.add_locks |
403 | 455 |
acquiring_locks = level in lu.needed_locks |
404 | 456 |
|
405 | 457 |
if level not in locking.LEVELS: |
406 |
_VerifyLocks(lu, glm)
|
|
458 |
_VerifyLocks(lu) |
|
407 | 459 |
|
408 | 460 |
if self._cbs: |
409 | 461 |
self._cbs.NotifyStart() |
... | ... | |
445 | 497 |
|
446 | 498 |
self._AcquireLocks(level, needed_locks, share, opportunistic, |
447 | 499 |
calc_timeout()) |
500 |
(jid, livelockfile) = lu.wconfdcontext |
|
501 |
lu.wconfdlocks = self.wconfd.Client().ListLocks(jid, livelockfile) |
|
448 | 502 |
else: |
449 | 503 |
# Adding locks |
450 | 504 |
add_locks = lu.add_locks[level] |
505 |
if isinstance(add_locks, str): |
|
506 |
add_locks = [add_locks] |
|
451 | 507 |
lu.remove_locks[level] = add_locks |
452 | 508 |
|
453 | 509 |
try: |
454 |
glm.add(level, add_locks, acquired=1, shared=share) |
|
455 |
except errors.LockError: |
|
510 |
jid = int(self.GetECId()) |
|
511 |
livelockfile = self.context.livelock.lockfile.name |
|
512 |
levelname = locking.LEVEL_NAMES[level] |
|
513 |
|
|
514 |
if share: |
|
515 |
mode = "shared" |
|
516 |
else: |
|
517 |
mode = "exclusive" |
|
518 |
|
|
519 |
request = [["%s/%s" % (levelname, lock), mode] |
|
520 |
for lock in add_locks] |
|
521 |
|
|
522 |
logging.debug("Requesting %s for %d (%s)", |
|
523 |
request, jid, livelockfile) |
|
524 |
blocked = \ |
|
525 |
self.wconfd.Client().TryUpdateLocks(jid, livelockfile, request) |
|
526 |
assert blocked == [], "Allocating newly 'created' locks failed" |
|
527 |
(jid, livelockfile) = lu.wconfdcontext |
|
528 |
lu.wconfdlocks = self.wconfd.Client().ListLocks(jid, livelockfile) |
|
529 |
except errors.GenericError: |
|
530 |
# TODO: verify what actually caused the error |
|
456 | 531 |
logging.exception("Detected lock error in level %s for locks" |
457 | 532 |
" %s, shared=%s", level, add_locks, share) |
458 | 533 |
raise errors.OpPrereqError( |
... | ... | |
464 | 539 |
result = self._LockAndExecLU(lu, level + 1, calc_timeout) |
465 | 540 |
finally: |
466 | 541 |
if level in lu.remove_locks: |
467 |
glm.remove(level, lu.remove_locks[level]) |
|
542 |
jid = int(self.GetECId()) |
|
543 |
livelockfile = self.context.livelock.lockfile.name |
|
544 |
levelname = locking.LEVEL_NAMES[level] |
|
545 |
request = [["%s/%s" % (levelname, lock), "release"] |
|
546 |
for lock in lu.remove_locks[level]] |
|
547 |
blocked = \ |
|
548 |
self.wconfd.Client().TryUpdateLocks(jid, livelockfile, request) |
|
549 |
assert blocked == [], "Release may not fail" |
|
468 | 550 |
finally: |
469 |
if glm.is_owned(level): |
|
470 |
glm.release(level) |
|
471 |
|
|
551 |
jid = int(self.GetECId()) |
|
552 |
livelockfile = self.context.livelock.lockfile.name |
|
553 |
levelname = locking.LEVEL_NAMES[level] |
|
554 |
logging.debug("Freeing locks at level %s for %d (%s)", |
|
555 |
levelname, jid, livelockfile) |
|
556 |
self.wconfd.Client().FreeLocksLevel(jid, livelockfile, levelname) |
|
472 | 557 |
else: |
473 | 558 |
result = self._LockAndExecLU(lu, level + 1, calc_timeout) |
474 | 559 |
|
... | ... | |
529 | 614 |
" disabled" % op.OP_ID) |
530 | 615 |
|
531 | 616 |
try: |
532 |
lu = lu_class(self, op, self.context, self.rpc) |
|
617 |
jid = int(self.GetECId()) |
|
618 |
livelockfile = self.context.livelock.lockfile.name |
|
619 |
lu = lu_class(self, op, self.context, self.rpc, (jid, livelockfile), |
|
620 |
self.wconfd) |
|
621 |
lu.wconfdlocks = self.wconfd.Client().ListLocks(jid, livelockfile) |
|
533 | 622 |
lu.ExpandNames() |
534 | 623 |
assert lu.needed_locks is not None, "needed_locks not set by LU" |
535 | 624 |
|
... | ... | |
541 | 630 |
self.context.cfg.DropECReservations(self._ec_id) |
542 | 631 |
finally: |
543 | 632 |
# Release BGL if owned |
544 |
if self.context.glm.is_owned(locking.LEVEL_CLUSTER): |
|
545 |
assert self._enable_locks |
|
546 |
self.context.glm.release(locking.LEVEL_CLUSTER) |
|
633 |
jid = int(self.GetECId()) |
|
634 |
livelockfile = self.context.livelock.lockfile.name |
|
635 |
bglname = "%s/%s" % (locking.LEVEL_NAMES[locking.LEVEL_CLUSTER], |
|
636 |
locking.BGL) |
|
637 |
self.wconfd.Client().TryUpdateLocks(jid, livelockfile, |
|
638 |
[[bglname, "release"]]) |
|
547 | 639 |
finally: |
548 | 640 |
self._cbs = None |
549 | 641 |
|
b/lib/server/masterd.py | ||
---|---|---|
489 | 489 |
object.__setattr__(self, name, value) |
490 | 490 |
|
491 | 491 |
def AddNode(self, node, ec_id): |
492 |
"""Adds a node to the configuration and lock manager.
|
|
492 |
"""Adds a node to the configuration. |
|
493 | 493 |
|
494 | 494 |
""" |
495 | 495 |
# Add it to the configuration |
... | ... | |
498 | 498 |
# If preseeding fails it'll not be added |
499 | 499 |
self.jobqueue.AddNode(node) |
500 | 500 |
|
501 |
# Add the new node to the Ganeti Lock Manager |
|
502 |
self.glm.add(locking.LEVEL_NODE, node.uuid) |
|
503 |
self.glm.add(locking.LEVEL_NODE_RES, node.uuid) |
|
504 |
|
|
505 | 501 |
def ReaddNode(self, node): |
506 | 502 |
"""Updates a node that's already in the configuration |
507 | 503 |
|
... | ... | |
519 | 515 |
# Notify job queue |
520 | 516 |
self.jobqueue.RemoveNode(node.name) |
521 | 517 |
|
522 |
# Remove the node from the Ganeti Lock Manager |
|
523 |
self.glm.remove(locking.LEVEL_NODE, node.uuid) |
|
524 |
self.glm.remove(locking.LEVEL_NODE_RES, node.uuid) |
|
525 |
|
|
526 | 518 |
|
527 | 519 |
def _SetWatcherPause(context, until): |
528 | 520 |
"""Creates or removes the watcher pause file. |
b/test/py/cmdlib/testsupport/cmdlib_testcase.py | ||
---|---|---|
29 | 29 |
|
30 | 30 |
from cmdlib.testsupport.config_mock import ConfigMock |
31 | 31 |
from cmdlib.testsupport.iallocator_mock import patchIAllocator |
32 |
from cmdlib.testsupport.lock_manager_mock import LockManagerMock
|
|
32 |
from cmdlib.testsupport.livelock_mock import LiveLockMock
|
|
33 | 33 |
from cmdlib.testsupport.netutils_mock import patchNetutils, \ |
34 | 34 |
SetupDefaultNetutilsMock |
35 | 35 |
from cmdlib.testsupport.processor_mock import ProcessorMock |
36 | 36 |
from cmdlib.testsupport.rpc_runner_mock import CreateRpcRunnerMock, \ |
37 | 37 |
RpcResultsBuilder, patchRpc, SetupDefaultRpcModuleMock |
38 | 38 |
from cmdlib.testsupport.ssh_mock import patchSsh |
39 |
from cmdlib.testsupport.wconfd_mock import WConfdMock |
|
39 | 40 |
|
40 | 41 |
from ganeti.cmdlib.base import LogicalUnit |
41 | 42 |
from ganeti import errors |
42 |
from ganeti import locking |
|
43 | 43 |
from ganeti import objects |
44 | 44 |
from ganeti import opcodes |
45 | 45 |
from ganeti import runtime |
... | ... | |
51 | 51 |
# pylint: disable=W0212 |
52 | 52 |
cfg = property(fget=lambda self: self._test_case.cfg) |
53 | 53 |
# pylint: disable=W0212 |
54 |
glm = property(fget=lambda self: self._test_case.glm) |
|
55 |
# pylint: disable=W0212 |
|
56 | 54 |
rpc = property(fget=lambda self: self._test_case.rpc) |
57 | 55 |
|
58 | 56 |
def __init__(self, test_case): |
59 | 57 |
self._test_case = test_case |
58 |
self.livelock = LiveLockMock() |
|
60 | 59 |
|
61 | 60 |
def AddNode(self, node, ec_id): |
62 | 61 |
self._test_case.cfg.AddNode(node, ec_id) |
63 |
self._test_case.glm.add(locking.LEVEL_NODE, node.uuid) |
|
64 |
self._test_case.glm.add(locking.LEVEL_NODE_RES, node.uuid) |
|
65 | 62 |
|
66 | 63 |
def ReaddNode(self, node): |
67 | 64 |
pass |
68 | 65 |
|
69 | 66 |
def RemoveNode(self, node): |
70 | 67 |
self._test_case.cfg.RemoveNode(node.uuid) |
71 |
self._test_case.glm.remove(locking.LEVEL_NODE, node.uuid) |
|
72 |
self._test_case.glm.remove(locking.LEVEL_NODE_RES, node.uuid) |
|
73 | 68 |
|
74 | 69 |
|
75 | 70 |
class MockLU(LogicalUnit): |
... | ... | |
90 | 85 |
The environment can be customized via the following fields: |
91 | 86 |
|
92 | 87 |
* C{cfg}: @see L{ConfigMock} |
93 |
* C{glm}: @see L{LockManagerMock} |
|
94 | 88 |
* C{rpc}: @see L{CreateRpcRunnerMock} |
95 | 89 |
* C{iallocator_cls}: @see L{patchIAllocator} |
96 | 90 |
* C{mcpu}: @see L{ProcessorMock} |
... | ... | |
168 | 162 |
|
169 | 163 |
""" |
170 | 164 |
self.cfg = ConfigMock() |
171 |
self.glm = LockManagerMock() |
|
172 | 165 |
self.rpc = CreateRpcRunnerMock() |
173 | 166 |
self.ctx = GanetiContextMock(self) |
174 | 167 |
self.mcpu = ProcessorMock(self.ctx) |
... | ... | |
211 | 204 |
@return: A mock LU |
212 | 205 |
|
213 | 206 |
""" |
214 |
return MockLU(self.mcpu, mock.MagicMock(), self.ctx, self.rpc) |
|
207 |
return MockLU(self.mcpu, mock.MagicMock(), self.ctx, self.rpc, |
|
208 |
(1234, "/tmp/mock/livelock"), WConfdMock()) |
|
215 | 209 |
|
216 | 210 |
def RpcResultsBuilder(self, use_node_names=False): |
217 | 211 |
"""Creates a pre-configured L{RpcResultBuilder} |
... | ... | |
231 | 225 |
@return: the result of the LU's C{Exec} method |
232 | 226 |
|
233 | 227 |
""" |
234 |
self.glm.AddLocksFromConfig(self.cfg) |
|
235 |
|
|
236 | 228 |
return self.mcpu.ExecOpCodeAndRecordOutput(opcode) |
237 | 229 |
|
238 | 230 |
def ExecOpCodeExpectException(self, opcode, |
... | ... | |
292 | 284 |
@return: the result of test_func |
293 | 285 |
|
294 | 286 |
""" |
295 |
self.glm.AddLocksFromConfig(self.cfg) |
|
296 |
|
|
297 | 287 |
return self.mcpu.RunWithLockedLU(opcode, test_func) |
298 | 288 |
|
299 | 289 |
def assertLogContainsMessage(self, expected_msg): |
b/test/py/cmdlib/testsupport/processor_mock.py | ||
---|---|---|
27 | 27 |
from ganeti import constants |
28 | 28 |
from ganeti import mcpu |
29 | 29 |
|
30 |
from cmdlib.testsupport.wconfd_mock import WConfdMock |
|
31 |
|
|
30 | 32 |
|
31 | 33 |
class LogRecordingCallback(mcpu.OpExecCbBase): |
32 | 34 |
"""Helper class for log output recording. |
... | ... | |
68 | 70 |
super(ProcessorMock, self).__init__(context, 1, True) |
69 | 71 |
self.log_entries = [] |
70 | 72 |
self._lu_test_func = None |
73 |
self.wconfd = WConfdMock() |
|
71 | 74 |
|
72 | 75 |
def ExecOpCodeAndRecordOutput(self, op): |
73 | 76 |
"""Executes the given opcode and records the output for further inspection. |
b/test/py/ganeti.hooks_unittest.py | ||
---|---|---|
246 | 246 |
self.context = FakeContext() |
247 | 247 |
# WARNING: here we pass None as RpcRunner instance since we know |
248 | 248 |
# our usage via HooksMaster will not use lu.rpc |
249 |
self.lu = FakeLU(FakeProc(), self.op, self.context, None) |
|
249 |
self.lu = FakeLU(FakeProc(), self.op, self.context, None, (123, "/foo/bar"), |
|
250 |
None) |
|
250 | 251 |
|
251 | 252 |
def testTotalFalse(self): |
252 | 253 |
"""Test complete rpc failure""" |
... | ... | |
518 | 519 |
|
519 | 520 |
self.op = opcodes.OpTestDummy(result=False, messages=[], fail=False) |
520 | 521 |
self.lu = FakeEnvWithCustomPostHookNodesLU(FakeProc(), self.op, |
521 |
FakeContext(), None) |
|
522 |
FakeContext(), None, |
|
523 |
(123, "/foo/bar"), |
|
524 |
None) |
|
522 | 525 |
|
523 | 526 |
def _HooksRpc(self, *args): |
524 | 527 |
self._rpcs.append(args) |
b/test/py/ganeti.mcpu_unittest.py | ||
---|---|---|
173 | 173 |
def __init__(self, needed_locks, share_locks): |
174 | 174 |
self.needed_locks = needed_locks |
175 | 175 |
self.share_locks = share_locks |
176 |
self.locks = [] |
|
176 | 177 |
|
177 |
|
|
178 |
class _FakeGlm: |
|
179 |
def __init__(self, owning_nal): |
|
180 |
self._owning_nal = owning_nal |
|
181 |
|
|
182 |
def check_owned(self, level, names): |
|
183 |
assert level == locking.LEVEL_NODE_ALLOC |
|
184 |
assert names == locking.NAL |
|
185 |
return self._owning_nal |
|
186 |
|
|
187 |
def owning_all(self, level): |
|
188 |
return False |
|
178 |
def owned_locks(self, *_): |
|
179 |
return self.locks |
|
189 | 180 |
|
190 | 181 |
|
191 | 182 |
class TestVerifyLocks(unittest.TestCase): |
192 | 183 |
def testNoLocks(self): |
193 | 184 |
lu = _FakeLuWithLocks({}, {}) |
194 |
glm = _FakeGlm(False) |
|
195 |
mcpu._VerifyLocks(lu, glm, |
|
185 |
mcpu._VerifyLocks(lu, |
|
196 | 186 |
_mode_whitelist=NotImplemented, |
197 | 187 |
_nal_whitelist=NotImplemented) |
198 | 188 |
|
... | ... | |
204 | 194 |
level: 0, |
205 | 195 |
locking.LEVEL_NODE_ALLOC: 0, |
206 | 196 |
}) |
207 |
glm = _FakeGlm(False) |
|
208 |
mcpu._VerifyLocks(lu, glm, _mode_whitelist=[], _nal_whitelist=[]) |
|
197 |
mcpu._VerifyLocks(lu, _mode_whitelist=[], _nal_whitelist=[]) |
|
209 | 198 |
|
210 | 199 |
def testDifferentMode(self): |
211 | 200 |
for level in [locking.LEVEL_NODE, locking.LEVEL_NODE_RES]: |
... | ... | |
215 | 204 |
level: 0, |
216 | 205 |
locking.LEVEL_NODE_ALLOC: 1, |
217 | 206 |
}) |
218 |
glm = _FakeGlm(False) |
|
219 | 207 |
try: |
220 |
mcpu._VerifyLocks(lu, glm, _mode_whitelist=[], _nal_whitelist=[])
|
|
208 |
mcpu._VerifyLocks(lu, _mode_whitelist=[], _nal_whitelist=[]) |
|
221 | 209 |
except AssertionError, err: |
222 | 210 |
self.assertTrue("using the same mode as nodes" in str(err)) |
223 | 211 |
else: |
224 | 212 |
self.fail("Exception not raised") |
225 | 213 |
|
226 | 214 |
# Once more with the whitelist |
227 |
mcpu._VerifyLocks(lu, glm, _mode_whitelist=[_FakeLuWithLocks],
|
|
215 |
mcpu._VerifyLocks(lu, _mode_whitelist=[_FakeLuWithLocks], |
|
228 | 216 |
_nal_whitelist=[]) |
229 | 217 |
|
230 | 218 |
def testSameMode(self): |
... | ... | |
236 | 224 |
level: 1, |
237 | 225 |
locking.LEVEL_NODE_ALLOC: 1, |
238 | 226 |
}) |
239 |
glm = _FakeGlm(True) |
|
240 | 227 |
|
241 | 228 |
try: |
242 |
mcpu._VerifyLocks(lu, glm, _mode_whitelist=[_FakeLuWithLocks],
|
|
229 |
mcpu._VerifyLocks(lu, _mode_whitelist=[_FakeLuWithLocks], |
|
243 | 230 |
_nal_whitelist=[]) |
244 | 231 |
except AssertionError, err: |
245 | 232 |
self.assertTrue("whitelisted to use different modes" in str(err)) |
... | ... | |
247 | 234 |
self.fail("Exception not raised") |
248 | 235 |
|
249 | 236 |
# Once more without the whitelist |
250 |
mcpu._VerifyLocks(lu, glm, _mode_whitelist=[], _nal_whitelist=[])
|
|
237 |
mcpu._VerifyLocks(lu, _mode_whitelist=[], _nal_whitelist=[]) |
|
251 | 238 |
|
252 | 239 |
def testAllWithoutAllocLock(self): |
253 | 240 |
for level in [locking.LEVEL_NODE, locking.LEVEL_NODE_RES]: |
... | ... | |
257 | 244 |
level: 0, |
258 | 245 |
locking.LEVEL_NODE_ALLOC: 0, |
259 | 246 |
}) |
260 |
glm = _FakeGlm(False) |
|
247 |
|
|
261 | 248 |
try: |
262 |
mcpu._VerifyLocks(lu, glm, _mode_whitelist=[], _nal_whitelist=[])
|
|
249 |
mcpu._VerifyLocks(lu, _mode_whitelist=[], _nal_whitelist=[]) |
|
263 | 250 |
except AssertionError, err: |
264 | 251 |
self.assertTrue("allocation lock must be used if" in str(err)) |
265 | 252 |
else: |
266 | 253 |
self.fail("Exception not raised") |
267 | 254 |
|
268 | 255 |
# Once more with the whitelist |
269 |
mcpu._VerifyLocks(lu, glm, _mode_whitelist=[],
|
|
256 |
mcpu._VerifyLocks(lu, _mode_whitelist=[], |
|
270 | 257 |
_nal_whitelist=[_FakeLuWithLocks]) |
271 | 258 |
|
272 | 259 |
def testAllWithAllocLock(self): |
... | ... | |
278 | 265 |
level: 0, |
279 | 266 |
locking.LEVEL_NODE_ALLOC: 0, |
280 | 267 |
}) |
281 |
glm = _FakeGlm(True)
|
|
268 |
lu.locks = [locking.NAL]
|
|
282 | 269 |
|
283 | 270 |
try: |
284 |
mcpu._VerifyLocks(lu, glm, _mode_whitelist=[],
|
|
271 |
mcpu._VerifyLocks(lu, _mode_whitelist=[], |
|
285 | 272 |
_nal_whitelist=[_FakeLuWithLocks]) |
286 | 273 |
except AssertionError, err: |
287 | 274 |
self.assertTrue("whitelisted for not acquiring" in str(err)) |
... | ... | |
289 | 276 |
self.fail("Exception not raised") |
290 | 277 |
|
291 | 278 |
# Once more without the whitelist |
292 |
mcpu._VerifyLocks(lu, glm, _mode_whitelist=[], _nal_whitelist=[])
|
|
279 |
mcpu._VerifyLocks(lu, _mode_whitelist=[], _nal_whitelist=[]) |
|
293 | 280 |
|
294 | 281 |
|
295 | 282 |
if __name__ == "__main__": |
Also available in: Unified diff