root / qa / qa_job_utils.py @ fbab1c76
History | View | Annotate | Download (9.4 kB)
1 |
#
|
---|---|
2 |
#
|
3 |
|
4 |
# Copyright (C) 2014 Google Inc.
|
5 |
#
|
6 |
# This program is free software; you can redistribute it and/or modify
|
7 |
# it under the terms of the GNU General Public License as published by
|
8 |
# the Free Software Foundation; either version 2 of the License, or
|
9 |
# (at your option) any later version.
|
10 |
#
|
11 |
# This program is distributed in the hope that it will be useful, but
|
12 |
# WITHOUT ANY WARRANTY; without even the implied warranty of
|
13 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
14 |
# General Public License for more details.
|
15 |
#
|
16 |
# You should have received a copy of the GNU General Public License
|
17 |
# along with this program; if not, write to the Free Software
|
18 |
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
|
19 |
# 02110-1301, USA.
|
20 |
|
21 |
|
22 |
"""QA utility functions for testing jobs
|
23 |
|
24 |
"""
|
25 |
|
26 |
import re |
27 |
import threading |
28 |
import time |
29 |
|
30 |
from ganeti import constants |
31 |
from ganeti import locking |
32 |
from ganeti import utils |
33 |
from ganeti.utils import retry |
34 |
|
35 |
import qa_config |
36 |
import qa_error |
37 |
|
38 |
from qa_utils import AssertCommand, GetCommandOutput, GetObjectInfo |
39 |
|
40 |
|
41 |
AVAILABLE_LOCKS = [locking.LEVEL_NODE, ] |
42 |
|
43 |
|
44 |
def _GetOutputFromMaster(cmd, use_multiplexer=True, log_cmd=True): |
45 |
""" Gets the output of a command executed on master.
|
46 |
|
47 |
"""
|
48 |
if isinstance(cmd, basestring): |
49 |
cmdstr = cmd |
50 |
else:
|
51 |
cmdstr = utils.ShellQuoteArgs(cmd) |
52 |
|
53 |
# Necessary due to the stderr stream not being captured properly on the
|
54 |
# buildbot
|
55 |
cmdstr += " 2>&1"
|
56 |
|
57 |
return GetCommandOutput(qa_config.GetMasterNode().primary, cmdstr,
|
58 |
use_multiplexer=use_multiplexer, log_cmd=log_cmd) |
59 |
|
60 |
|
61 |
def ExecuteJobProducingCommand(cmd): |
62 |
""" Executes a command that contains the --submit flag, and returns a job id.
|
63 |
|
64 |
@type cmd: list of string
|
65 |
@param cmd: The command to execute, broken into constituent components.
|
66 |
|
67 |
"""
|
68 |
job_id_output = _GetOutputFromMaster(cmd) |
69 |
|
70 |
possible_job_ids = re.findall("JobID: ([0-9]+)", job_id_output)
|
71 |
if len(possible_job_ids) != 1: |
72 |
raise qa_error.Error("Cannot parse command output to find job id: output " |
73 |
"is %s" % job_id_output)
|
74 |
|
75 |
return int(possible_job_ids[0]) |
76 |
|
77 |
|
78 |
def _RetrieveTerminationInfo(job_id): |
79 |
""" Retrieves the termination info from a job caused by gnt-debug delay.
|
80 |
|
81 |
@rtype: dict or None
|
82 |
@return: The termination log entry, or None if no entry was found
|
83 |
|
84 |
"""
|
85 |
job_info = GetObjectInfo(["gnt-job", "info", str(job_id)]) |
86 |
|
87 |
opcodes = job_info[0]["Opcodes"] |
88 |
if not opcodes: |
89 |
raise qa_error.Error("Cannot retrieve a list of opcodes") |
90 |
|
91 |
execution_logs = opcodes[0]["Execution log"] |
92 |
if not execution_logs: |
93 |
return None |
94 |
|
95 |
is_termination_info_fn = \ |
96 |
lambda e: e["Content"][1] == constants.ELOG_DELAY_TEST |
97 |
|
98 |
filtered_logs = filter(is_termination_info_fn, execution_logs)
|
99 |
|
100 |
no_logs = len(filtered_logs)
|
101 |
if no_logs > 1: |
102 |
raise qa_error.Error("Too many interruption information entries found!") |
103 |
elif no_logs == 1: |
104 |
return filtered_logs[0] |
105 |
else:
|
106 |
return None |
107 |
|
108 |
|
109 |
def _StartDelayFunction(locks, timeout): |
110 |
""" Starts the gnt-debug delay option with the given locks and timeout.
|
111 |
|
112 |
"""
|
113 |
# The interruptible switch must be used
|
114 |
cmd = ["gnt-debug", "delay", "-i", "--submit", "--no-master"] |
115 |
|
116 |
for node in locks.get(locking.LEVEL_NODE, []): |
117 |
cmd.append("-n%s" % node)
|
118 |
cmd.append(str(timeout))
|
119 |
|
120 |
job_id = ExecuteJobProducingCommand(cmd) |
121 |
|
122 |
# Waits until a non-empty result is returned from the function
|
123 |
log_entry = retry.SimpleRetry(lambda x: x, _RetrieveTerminationInfo, 2.0, |
124 |
10.0, args=[job_id])
|
125 |
|
126 |
if not log_entry: |
127 |
raise qa_error.Error("Failure when trying to retrieve delay termination " |
128 |
"information")
|
129 |
|
130 |
_, _, (socket_path, ) = log_entry["Content"]
|
131 |
|
132 |
return socket_path
|
133 |
|
134 |
|
135 |
def _TerminateDelayFunction(termination_socket): |
136 |
""" Terminates the delay function by communicating with the domain socket.
|
137 |
|
138 |
"""
|
139 |
AssertCommand("echo a | socat -u stdin UNIX-CLIENT:%s" % termination_socket)
|
140 |
|
141 |
|
142 |
def _GetNodeUUIDMap(nodes): |
143 |
""" Given a list of nodes, retrieves a mapping of their names to UUIDs.
|
144 |
|
145 |
@type nodes: list of string
|
146 |
@param nodes: The nodes to retrieve a map for. If empty, returns information
|
147 |
for all the nodes.
|
148 |
|
149 |
"""
|
150 |
cmd = ["gnt-node", "list", "--no-header", "-o", "name,uuid"] |
151 |
cmd.extend(nodes) |
152 |
output = _GetOutputFromMaster(cmd) |
153 |
return dict(map(lambda x: x.split(), output.splitlines())) |
154 |
|
155 |
|
156 |
def _FindLockNames(locks): |
157 |
""" Finds the ids and descriptions of locks that given locks can block.
|
158 |
|
159 |
@type locks: dict of locking level to list
|
160 |
@param locks: The locks that gnt-debug delay is holding.
|
161 |
|
162 |
@rtype: dict of string to string
|
163 |
@return: The lock name to entity name map.
|
164 |
|
165 |
For a given set of locks, some internal locks (e.g. ALL_SET locks) can be
|
166 |
blocked even though they were not listed explicitly. This function has to take
|
167 |
care and list all locks that can be blocked by the locks given as parameters.
|
168 |
|
169 |
"""
|
170 |
lock_map = {} |
171 |
|
172 |
if locking.LEVEL_NODE in locks: |
173 |
node_locks = locks[locking.LEVEL_NODE] |
174 |
if node_locks == locking.ALL_SET:
|
175 |
# Empty list retrieves all info
|
176 |
name_uuid_map = _GetNodeUUIDMap([]) |
177 |
else:
|
178 |
name_uuid_map = _GetNodeUUIDMap(node_locks) |
179 |
|
180 |
for name in name_uuid_map: |
181 |
lock_map["node/%s" % name_uuid_map[name]] = name
|
182 |
|
183 |
# If ALL_SET was requested explicitly, or there is at least one lock
|
184 |
# Note that locking.ALL_SET is None and hence the strange form of the if
|
185 |
if node_locks == locking.ALL_SET or node_locks: |
186 |
lock_map["node/[lockset]"] = "joint node lock" |
187 |
|
188 |
#TODO add other lock types here when support for these is added
|
189 |
return lock_map
|
190 |
|
191 |
|
192 |
def _GetBlockingLocks(): |
193 |
""" Finds out which locks are blocking jobs by invoking "gnt-debug locks".
|
194 |
|
195 |
@rtype: list of string
|
196 |
@return: The names of the locks currently blocking any job.
|
197 |
|
198 |
"""
|
199 |
# Due to mysterious issues when a SSH multiplexer is being used by two
|
200 |
# threads, we turn it off, and block most of the logging to improve the
|
201 |
# visibility of the other thread's output
|
202 |
locks_output = _GetOutputFromMaster("gnt-debug locks", use_multiplexer=False, |
203 |
log_cmd=False)
|
204 |
|
205 |
# The first non-empty line is the header, which we do not need
|
206 |
lock_lines = locks_output.splitlines()[1:]
|
207 |
|
208 |
blocking_locks = [] |
209 |
for lock_line in lock_lines: |
210 |
components = lock_line.split() |
211 |
if len(components) != 4: |
212 |
raise qa_error.Error("Error while parsing gnt-debug locks output, " |
213 |
"line at fault is: %s" % lock_line)
|
214 |
|
215 |
lock_name, _, _, pending_jobs = components |
216 |
|
217 |
if pending_jobs != '-': |
218 |
blocking_locks.append(lock_name) |
219 |
|
220 |
return blocking_locks
|
221 |
|
222 |
|
223 |
# TODO: Can this be done as a decorator? Implement as needed.
|
224 |
def RunWithLocks(fn, locks, timeout, block, *args, **kwargs): |
225 |
""" Runs the given function, acquiring a set of locks beforehand.
|
226 |
|
227 |
@type fn: function
|
228 |
@param fn: The function to invoke.
|
229 |
@type locks: dict of string to list of string
|
230 |
@param locks: The locks to acquire, per lock category.
|
231 |
@type timeout: number
|
232 |
@param timeout: The number of seconds the locks should be held before
|
233 |
expiring.
|
234 |
@type block: bool
|
235 |
@param block: Whether the test should block when locks are used or not.
|
236 |
|
237 |
This function allows a set of locks to be acquired in preparation for a QA
|
238 |
test, to try and see if the function can run in parallel with other
|
239 |
operations.
|
240 |
|
241 |
Locks are acquired by invoking a gnt-debug delay operation which can be
|
242 |
interrupted as needed. The QA test is then run in a separate thread, with the
|
243 |
current thread observing jobs waiting for locks. When a job is spotted waiting
|
244 |
for a lock held by the started delay operation, this is noted, and the delay
|
245 |
is interrupted, allowing the QA test to continue.
|
246 |
|
247 |
A default timeout is not provided by design - the test creator must make a
|
248 |
good conservative estimate.
|
249 |
|
250 |
"""
|
251 |
if filter(lambda l_type: l_type not in AVAILABLE_LOCKS, locks): |
252 |
raise qa_error.Error("Attempted to acquire locks that cannot yet be " |
253 |
"acquired in the course of a QA test.")
|
254 |
|
255 |
# The watcher may interfere by issuing its own jobs - therefore pause it
|
256 |
AssertCommand(["gnt-cluster", "watcher", "pause", "12h"]) |
257 |
|
258 |
# Find out the lock names prior to starting the delay function
|
259 |
lock_name_map = _FindLockNames(locks) |
260 |
|
261 |
termination_socket = _StartDelayFunction(locks, timeout) |
262 |
|
263 |
qa_thread = threading.Thread(target=fn, args=args, kwargs=kwargs) |
264 |
qa_thread.start() |
265 |
|
266 |
blocking_owned_locks = [] |
267 |
test_blocked = False
|
268 |
|
269 |
try:
|
270 |
while qa_thread.isAlive():
|
271 |
blocking_locks = _GetBlockingLocks() |
272 |
blocking_owned_locks = \ |
273 |
set(blocking_locks).intersection(set(lock_name_map)) |
274 |
|
275 |
if blocking_owned_locks:
|
276 |
test_blocked = True
|
277 |
_TerminateDelayFunction(termination_socket) |
278 |
break
|
279 |
|
280 |
# The sleeping time has been set arbitrarily
|
281 |
time.sleep(5)
|
282 |
except:
|
283 |
# If anything goes wrong here, we should be responsible and terminate the
|
284 |
# delay job
|
285 |
_TerminateDelayFunction(termination_socket) |
286 |
raise
|
287 |
|
288 |
qa_thread.join() |
289 |
|
290 |
blocking_lock_names = ", ".join(map(lock_name_map.get, blocking_owned_locks)) |
291 |
if not block and test_blocked: |
292 |
raise qa_error.Error("QA test succeded, but was blocked by locks: %s" % |
293 |
blocking_lock_names) |
294 |
elif block and not test_blocked: |
295 |
raise qa_error.Error("QA test succeded, but was not blocked as it was " |
296 |
"expected to by locks: %s" % blocking_lock_names)
|
297 |
else:
|
298 |
_TerminateDelayFunction(termination_socket) |
299 |
|
300 |
# Revive the watcher
|
301 |
AssertCommand(["gnt-cluster", "watcher", "continue"]) |