root / qa / qa_job_utils.py @ 1d523139
History | View | Annotate | Download (10.3 kB)
1 |
#
|
---|---|
2 |
#
|
3 |
|
4 |
# Copyright (C) 2014 Google Inc.
|
5 |
#
|
6 |
# This program is free software; you can redistribute it and/or modify
|
7 |
# it under the terms of the GNU General Public License as published by
|
8 |
# the Free Software Foundation; either version 2 of the License, or
|
9 |
# (at your option) any later version.
|
10 |
#
|
11 |
# This program is distributed in the hope that it will be useful, but
|
12 |
# WITHOUT ANY WARRANTY; without even the implied warranty of
|
13 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
14 |
# General Public License for more details.
|
15 |
#
|
16 |
# You should have received a copy of the GNU General Public License
|
17 |
# along with this program; if not, write to the Free Software
|
18 |
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
|
19 |
# 02110-1301, USA.
|
20 |
|
21 |
|
22 |
"""QA utility functions for testing jobs
|
23 |
|
24 |
"""
|
25 |
|
26 |
import re |
27 |
import sys |
28 |
import threading |
29 |
import time |
30 |
|
31 |
from ganeti import constants |
32 |
from ganeti import locking |
33 |
from ganeti import utils |
34 |
from ganeti.utils import retry |
35 |
|
36 |
import qa_config |
37 |
import qa_error |
38 |
|
39 |
from qa_utils import AssertCommand, GetCommandOutput, GetObjectInfo |
40 |
|
41 |
|
42 |
AVAILABLE_LOCKS = [locking.LEVEL_NODE, ] |
43 |
|
44 |
|
45 |
def _GetOutputFromMaster(cmd, use_multiplexer=True, log_cmd=True): |
46 |
""" Gets the output of a command executed on master.
|
47 |
|
48 |
"""
|
49 |
if isinstance(cmd, basestring): |
50 |
cmdstr = cmd |
51 |
else:
|
52 |
cmdstr = utils.ShellQuoteArgs(cmd) |
53 |
|
54 |
# Necessary due to the stderr stream not being captured properly on the
|
55 |
# buildbot
|
56 |
cmdstr += " 2>&1"
|
57 |
|
58 |
return GetCommandOutput(qa_config.GetMasterNode().primary, cmdstr,
|
59 |
use_multiplexer=use_multiplexer, log_cmd=log_cmd) |
60 |
|
61 |
|
62 |
def ExecuteJobProducingCommand(cmd): |
63 |
""" Executes a command that contains the --submit flag, and returns a job id.
|
64 |
|
65 |
@type cmd: list of string
|
66 |
@param cmd: The command to execute, broken into constituent components.
|
67 |
|
68 |
"""
|
69 |
job_id_output = _GetOutputFromMaster(cmd) |
70 |
|
71 |
possible_job_ids = re.findall("JobID: ([0-9]+)", job_id_output)
|
72 |
if len(possible_job_ids) != 1: |
73 |
raise qa_error.Error("Cannot parse command output to find job id: output " |
74 |
"is %s" % job_id_output)
|
75 |
|
76 |
return int(possible_job_ids[0]) |
77 |
|
78 |
|
79 |
def _RetrieveTerminationInfo(job_id): |
80 |
""" Retrieves the termination info from a job caused by gnt-debug delay.
|
81 |
|
82 |
@rtype: dict or None
|
83 |
@return: The termination log entry, or None if no entry was found
|
84 |
|
85 |
"""
|
86 |
job_info = GetObjectInfo(["gnt-job", "info", str(job_id)]) |
87 |
|
88 |
opcodes = job_info[0]["Opcodes"] |
89 |
if not opcodes: |
90 |
raise qa_error.Error("Cannot retrieve a list of opcodes") |
91 |
|
92 |
execution_logs = opcodes[0]["Execution log"] |
93 |
if not execution_logs: |
94 |
return None |
95 |
|
96 |
is_termination_info_fn = \ |
97 |
lambda e: e["Content"][1] == constants.ELOG_DELAY_TEST |
98 |
|
99 |
filtered_logs = filter(is_termination_info_fn, execution_logs)
|
100 |
|
101 |
no_logs = len(filtered_logs)
|
102 |
if no_logs > 1: |
103 |
raise qa_error.Error("Too many interruption information entries found!") |
104 |
elif no_logs == 1: |
105 |
return filtered_logs[0] |
106 |
else:
|
107 |
return None |
108 |
|
109 |
|
110 |
def _StartDelayFunction(locks, timeout): |
111 |
""" Starts the gnt-debug delay option with the given locks and timeout.
|
112 |
|
113 |
"""
|
114 |
# The interruptible switch must be used
|
115 |
cmd = ["gnt-debug", "delay", "-i", "--submit", "--no-master"] |
116 |
|
117 |
for node in locks.get(locking.LEVEL_NODE, []): |
118 |
cmd.append("-n%s" % node)
|
119 |
cmd.append(str(timeout))
|
120 |
|
121 |
job_id = ExecuteJobProducingCommand(cmd) |
122 |
|
123 |
# Waits until a non-empty result is returned from the function
|
124 |
log_entry = retry.SimpleRetry(lambda x: x, _RetrieveTerminationInfo, 2.0, |
125 |
10.0, args=[job_id])
|
126 |
|
127 |
if not log_entry: |
128 |
raise qa_error.Error("Failure when trying to retrieve delay termination " |
129 |
"information")
|
130 |
|
131 |
_, _, (socket_path, ) = log_entry["Content"]
|
132 |
|
133 |
return socket_path
|
134 |
|
135 |
|
136 |
def _TerminateDelayFunction(termination_socket): |
137 |
""" Terminates the delay function by communicating with the domain socket.
|
138 |
|
139 |
"""
|
140 |
AssertCommand("echo a | socat -u stdin UNIX-CLIENT:%s" % termination_socket)
|
141 |
|
142 |
|
143 |
def _GetNodeUUIDMap(nodes): |
144 |
""" Given a list of nodes, retrieves a mapping of their names to UUIDs.
|
145 |
|
146 |
@type nodes: list of string
|
147 |
@param nodes: The nodes to retrieve a map for. If empty, returns information
|
148 |
for all the nodes.
|
149 |
|
150 |
"""
|
151 |
cmd = ["gnt-node", "list", "--no-header", "-o", "name,uuid"] |
152 |
cmd.extend(nodes) |
153 |
output = _GetOutputFromMaster(cmd) |
154 |
return dict(map(lambda x: x.split(), output.splitlines())) |
155 |
|
156 |
|
157 |
def _FindLockNames(locks): |
158 |
""" Finds the ids and descriptions of locks that given locks can block.
|
159 |
|
160 |
@type locks: dict of locking level to list
|
161 |
@param locks: The locks that gnt-debug delay is holding.
|
162 |
|
163 |
@rtype: dict of string to string
|
164 |
@return: The lock name to entity name map.
|
165 |
|
166 |
For a given set of locks, some internal locks (e.g. ALL_SET locks) can be
|
167 |
blocked even though they were not listed explicitly. This function has to take
|
168 |
care and list all locks that can be blocked by the locks given as parameters.
|
169 |
|
170 |
"""
|
171 |
lock_map = {} |
172 |
|
173 |
if locking.LEVEL_NODE in locks: |
174 |
node_locks = locks[locking.LEVEL_NODE] |
175 |
if node_locks == locking.ALL_SET:
|
176 |
# Empty list retrieves all info
|
177 |
name_uuid_map = _GetNodeUUIDMap([]) |
178 |
else:
|
179 |
name_uuid_map = _GetNodeUUIDMap(node_locks) |
180 |
|
181 |
for name in name_uuid_map: |
182 |
lock_map["node/%s" % name_uuid_map[name]] = name
|
183 |
|
184 |
# If ALL_SET was requested explicitly, or there is at least one lock
|
185 |
# Note that locking.ALL_SET is None and hence the strange form of the if
|
186 |
if node_locks == locking.ALL_SET or node_locks: |
187 |
lock_map["node/[lockset]"] = "joint node lock" |
188 |
|
189 |
#TODO add other lock types here when support for these is added
|
190 |
return lock_map
|
191 |
|
192 |
|
193 |
def _GetBlockingLocks(): |
194 |
""" Finds out which locks are blocking jobs by invoking "gnt-debug locks".
|
195 |
|
196 |
@rtype: list of string
|
197 |
@return: The names of the locks currently blocking any job.
|
198 |
|
199 |
"""
|
200 |
# Due to mysterious issues when a SSH multiplexer is being used by two
|
201 |
# threads, we turn it off, and block most of the logging to improve the
|
202 |
# visibility of the other thread's output
|
203 |
locks_output = _GetOutputFromMaster("gnt-debug locks", use_multiplexer=False, |
204 |
log_cmd=False)
|
205 |
|
206 |
# The first non-empty line is the header, which we do not need
|
207 |
lock_lines = locks_output.splitlines()[1:]
|
208 |
|
209 |
blocking_locks = [] |
210 |
for lock_line in lock_lines: |
211 |
components = lock_line.split() |
212 |
if len(components) != 4: |
213 |
raise qa_error.Error("Error while parsing gnt-debug locks output, " |
214 |
"line at fault is: %s" % lock_line)
|
215 |
|
216 |
lock_name, _, _, pending_jobs = components |
217 |
|
218 |
if pending_jobs != '-': |
219 |
blocking_locks.append(lock_name) |
220 |
|
221 |
return blocking_locks
|
222 |
|
223 |
|
224 |
class QAThread(threading.Thread): |
225 |
""" An exception-preserving thread that executes a given function.
|
226 |
|
227 |
"""
|
228 |
def __init__(self, fn, args, kwargs): |
229 |
""" Constructor accepting the function to be invoked later.
|
230 |
|
231 |
"""
|
232 |
threading.Thread.__init__(self)
|
233 |
self._fn = fn
|
234 |
self._args = args
|
235 |
self._kwargs = kwargs
|
236 |
self._exc_info = None |
237 |
|
238 |
def run(self): |
239 |
""" Executes the function, preserving exception info if necessary.
|
240 |
|
241 |
"""
|
242 |
# pylint: disable=W0702
|
243 |
# We explicitly want to catch absolutely anything
|
244 |
try:
|
245 |
self._fn(*self._args, **self._kwargs) |
246 |
except:
|
247 |
self._exc_info = sys.exc_info()
|
248 |
# pylint: enable=W0702
|
249 |
|
250 |
def reraise(self): |
251 |
""" Reraises any exceptions that might have occured during thread execution.
|
252 |
|
253 |
"""
|
254 |
if self._exc_info is not None: |
255 |
raise self._exc_info[0], self._exc_info[1], self._exc_info[2] |
256 |
|
257 |
|
258 |
# TODO: Can this be done as a decorator? Implement as needed.
|
259 |
def RunWithLocks(fn, locks, timeout, block, *args, **kwargs): |
260 |
""" Runs the given function, acquiring a set of locks beforehand.
|
261 |
|
262 |
@type fn: function
|
263 |
@param fn: The function to invoke.
|
264 |
@type locks: dict of string to list of string
|
265 |
@param locks: The locks to acquire, per lock category.
|
266 |
@type timeout: number
|
267 |
@param timeout: The number of seconds the locks should be held before
|
268 |
expiring.
|
269 |
@type block: bool
|
270 |
@param block: Whether the test should block when locks are used or not.
|
271 |
|
272 |
This function allows a set of locks to be acquired in preparation for a QA
|
273 |
test, to try and see if the function can run in parallel with other
|
274 |
operations.
|
275 |
|
276 |
Locks are acquired by invoking a gnt-debug delay operation which can be
|
277 |
interrupted as needed. The QA test is then run in a separate thread, with the
|
278 |
current thread observing jobs waiting for locks. When a job is spotted waiting
|
279 |
for a lock held by the started delay operation, this is noted, and the delay
|
280 |
is interrupted, allowing the QA test to continue.
|
281 |
|
282 |
A default timeout is not provided by design - the test creator must make a
|
283 |
good conservative estimate.
|
284 |
|
285 |
"""
|
286 |
if filter(lambda l_type: l_type not in AVAILABLE_LOCKS, locks): |
287 |
raise qa_error.Error("Attempted to acquire locks that cannot yet be " |
288 |
"acquired in the course of a QA test.")
|
289 |
|
290 |
# The watcher may interfere by issuing its own jobs - therefore pause it
|
291 |
AssertCommand(["gnt-cluster", "watcher", "pause", "12h"]) |
292 |
|
293 |
# Find out the lock names prior to starting the delay function
|
294 |
lock_name_map = _FindLockNames(locks) |
295 |
|
296 |
termination_socket = _StartDelayFunction(locks, timeout) |
297 |
|
298 |
qa_thread = threading.Thread(target=fn, args=args, kwargs=kwargs) |
299 |
qa_thread.start() |
300 |
|
301 |
blocking_owned_locks = [] |
302 |
test_blocked = False
|
303 |
|
304 |
try:
|
305 |
while qa_thread.isAlive():
|
306 |
blocking_locks = _GetBlockingLocks() |
307 |
blocking_owned_locks = \ |
308 |
set(blocking_locks).intersection(set(lock_name_map)) |
309 |
|
310 |
if blocking_owned_locks:
|
311 |
test_blocked = True
|
312 |
_TerminateDelayFunction(termination_socket) |
313 |
break
|
314 |
|
315 |
# The sleeping time has been set arbitrarily
|
316 |
time.sleep(5)
|
317 |
except:
|
318 |
# If anything goes wrong here, we should be responsible and terminate the
|
319 |
# delay job
|
320 |
_TerminateDelayFunction(termination_socket) |
321 |
raise
|
322 |
|
323 |
qa_thread.join() |
324 |
|
325 |
blocking_lock_names = ", ".join(map(lock_name_map.get, blocking_owned_locks)) |
326 |
if not block and test_blocked: |
327 |
raise qa_error.Error("QA test succeded, but was blocked by locks: %s" % |
328 |
blocking_lock_names) |
329 |
elif block and not test_blocked: |
330 |
raise qa_error.Error("QA test succeded, but was not blocked as it was " |
331 |
"expected to by locks: %s" % blocking_lock_names)
|
332 |
else:
|
333 |
_TerminateDelayFunction(termination_socket) |
334 |
|
335 |
# Revive the watcher
|
336 |
AssertCommand(["gnt-cluster", "watcher", "continue"]) |