Revision 34c5ec6c
b/qa/qa_job_utils.py | ||
---|---|---|
24 | 24 |
""" |
25 | 25 |
|
26 | 26 |
import re |
27 |
import threading |
|
28 |
import time |
|
27 | 29 |
|
28 | 30 |
from ganeti import constants |
29 | 31 |
from ganeti import locking |
... | ... | |
38 | 40 |
AVAILABLE_LOCKS = [locking.LEVEL_NODE, ] |
39 | 41 |
|
40 | 42 |
|
41 |
def _GetOutputFromMaster(cmd): |
|
43 |
def _GetOutputFromMaster(cmd, use_multiplexer=True, log_cmd=True):
|
|
42 | 44 |
""" Gets the output of a command executed on master. |
43 | 45 |
|
44 | 46 |
""" |
... | ... | |
51 | 53 |
# buildbot |
52 | 54 |
cmdstr += " 2>&1" |
53 | 55 |
|
54 |
return GetCommandOutput(qa_config.GetMasterNode().primary, cmdstr) |
|
56 |
return GetCommandOutput(qa_config.GetMasterNode().primary, cmdstr, |
|
57 |
use_multiplexer=use_multiplexer, log_cmd=log_cmd) |
|
55 | 58 |
|
56 | 59 |
|
57 | 60 |
def ExecuteJobProducingCommand(cmd): |
... | ... | |
107 | 110 |
AssertCommand("echo a | socat -u stdin UNIX-CLIENT:%s" % termination_socket) |
108 | 111 |
|
109 | 112 |
|
113 |
def _GetNodeUUIDMap(nodes): |
|
114 |
""" Given a list of nodes, retrieves a mapping of their names to UUIDs. |
|
115 |
|
|
116 |
@type nodes: list of string |
|
117 |
@param nodes: The nodes to retrieve a map for. If empty, returns information |
|
118 |
for all the nodes. |
|
119 |
|
|
120 |
""" |
|
121 |
cmd = ["gnt-node", "list", "--no-header", "-o", "name,uuid"] |
|
122 |
cmd.extend(nodes) |
|
123 |
output = _GetOutputFromMaster(cmd) |
|
124 |
return dict(map(lambda x: x.split(), output.splitlines())) |
|
125 |
|
|
126 |
|
|
127 |
def _FindLockNames(locks): |
|
128 |
""" Finds the ids and descriptions of locks that given locks can block. |
|
129 |
|
|
130 |
@type locks: dict of locking level to list |
|
131 |
@param locks: The locks that gnt-debug delay is holding. |
|
132 |
|
|
133 |
@rtype: dict of string to string |
|
134 |
@return: The lock name to entity name map. |
|
135 |
|
|
136 |
For a given set of locks, some internal locks (e.g. ALL_SET locks) can be |
|
137 |
blocked even though they were not listed explicitly. This function has to take |
|
138 |
care and list all locks that can be blocked by the locks given as parameters. |
|
139 |
|
|
140 |
""" |
|
141 |
lock_map = {} |
|
142 |
|
|
143 |
if locking.LEVEL_NODE in locks: |
|
144 |
node_locks = locks[locking.LEVEL_NODE] |
|
145 |
if node_locks == locking.ALL_SET: |
|
146 |
# Empty list retrieves all info |
|
147 |
name_uuid_map = _GetNodeUUIDMap([]) |
|
148 |
else: |
|
149 |
name_uuid_map = _GetNodeUUIDMap(node_locks) |
|
150 |
|
|
151 |
for name in name_uuid_map: |
|
152 |
lock_map["node/%s" % name_uuid_map[name]] = name |
|
153 |
|
|
154 |
# If ALL_SET was requested explicitly, or there is at least one lock |
|
155 |
# Note that locking.ALL_SET is None and hence the strange form of the if |
|
156 |
if node_locks == locking.ALL_SET or node_locks: |
|
157 |
lock_map["node/[lockset]"] = "joint node lock" |
|
158 |
|
|
159 |
#TODO add other lock types here when support for these is added |
|
160 |
return lock_map |
|
161 |
|
|
162 |
|
|
163 |
def _GetBlockingLocks(): |
|
164 |
""" Finds out which locks are blocking jobs by invoking "gnt-debug locks". |
|
165 |
|
|
166 |
@rtype: list of string |
|
167 |
@return: The names of the locks currently blocking any job. |
|
168 |
|
|
169 |
""" |
|
170 |
# Due to mysterious issues when a SSH multiplexer is being used by two |
|
171 |
# threads, we turn it off, and block most of the logging to improve the |
|
172 |
# visibility of the other thread's output |
|
173 |
locks_output = _GetOutputFromMaster("gnt-debug locks", use_multiplexer=False, |
|
174 |
log_cmd=False) |
|
175 |
|
|
176 |
# The first non-empty line is the header, which we do not need |
|
177 |
lock_lines = locks_output.splitlines()[1:] |
|
178 |
|
|
179 |
blocking_locks = [] |
|
180 |
for lock_line in lock_lines: |
|
181 |
components = lock_line.split() |
|
182 |
if len(components) != 4: |
|
183 |
raise qa_error.Error("Error while parsing gnt-debug locks output, " |
|
184 |
"line at fault is: %s" % lock_line) |
|
185 |
|
|
186 |
lock_name, _, _, pending_jobs = components |
|
187 |
|
|
188 |
if pending_jobs != '-': |
|
189 |
blocking_locks.append(lock_name) |
|
190 |
|
|
191 |
return blocking_locks |
|
192 |
|
|
193 |
|
|
110 | 194 |
# TODO: Can this be done as a decorator? Implement as needed. |
111 | 195 |
def RunWithLocks(fn, locks, timeout, *args, **kwargs): |
112 | 196 |
""" Runs the given function, acquiring a set of locks beforehand. |
... | ... | |
123 | 207 |
test, to try and see if the function can run in parallel with other |
124 | 208 |
operations. |
125 | 209 |
|
126 |
The current version simply creates the locks, which expire after a given |
|
127 |
timeout, and attempts to invoke the provided function. |
|
128 |
|
|
129 |
This will probably block the QA, and future versions will address this. |
|
210 |
Locks are acquired by invoking a gnt-debug delay operation which can be |
|
211 |
interrupted as needed. The QA test is then run in a separate thread, with the |
|
212 |
current thread observing jobs waiting for locks. When a job is spotted waiting |
|
213 |
for a lock held by the started delay operation, this is noted, and the delay |
|
214 |
is interrupted, allowing the QA test to continue. |
|
130 | 215 |
|
131 | 216 |
A default timeout is not provided by design - the test creator must make a |
132 | 217 |
good conservative estimate. |
... | ... | |
139 | 224 |
# The watcher may interfere by issuing its own jobs - therefore pause it |
140 | 225 |
AssertCommand(["gnt-cluster", "watcher", "pause", "12h"]) |
141 | 226 |
|
142 |
termination_socket = _StartDelayFunction(locks, timeout) |
|
227 |
# Find out the lock names prior to starting the delay function |
|
228 |
lock_name_map = _FindLockNames(locks) |
|
143 | 229 |
|
144 |
fn(*args, **kwargs)
|
|
230 |
termination_socket = _StartDelayFunction(locks, timeout)
|
|
145 | 231 |
|
146 |
_TerminateDelayFunction(termination_socket) |
|
232 |
qa_thread = threading.Thread(target=fn, args=args, kwargs=kwargs) |
|
233 |
qa_thread.start() |
|
234 |
|
|
235 |
blocking_owned_locks = [] |
|
236 |
test_blocked = False |
|
237 |
|
|
238 |
try: |
|
239 |
while qa_thread.isAlive(): |
|
240 |
blocking_locks = _GetBlockingLocks() |
|
241 |
blocking_owned_locks = \ |
|
242 |
set(blocking_locks).intersection(set(lock_name_map)) |
|
243 |
|
|
244 |
if blocking_owned_locks: |
|
245 |
test_blocked = True |
|
246 |
_TerminateDelayFunction(termination_socket) |
|
247 |
break |
|
248 |
|
|
249 |
# The sleeping time has been set arbitrarily |
|
250 |
time.sleep(5) |
|
251 |
except: |
|
252 |
# If anything goes wrong here, we should be responsible and terminate the |
|
253 |
# delay job |
|
254 |
_TerminateDelayFunction(termination_socket) |
|
255 |
raise |
|
256 |
|
|
257 |
qa_thread.join() |
|
258 |
|
|
259 |
if test_blocked: |
|
260 |
blocking_lock_names = map(lock_name_map.get, blocking_owned_locks) |
|
261 |
raise qa_error.Error("QA test succeded, but was blocked by the locks: %s" % |
|
262 |
", ".join(blocking_lock_names)) |
|
263 |
else: |
|
264 |
_TerminateDelayFunction(termination_socket) |
|
147 | 265 |
|
148 | 266 |
# Revive the watcher |
149 | 267 |
AssertCommand(["gnt-cluster", "watcher", "continue"]) |
Also available in: Unified diff