root / qa / qa_job_utils.py @ 57efdaf5
History | View | Annotate | Download (8.7 kB)
1 |
#
|
---|---|
2 |
#
|
3 |
|
4 |
# Copyright (C) 2014 Google Inc.
|
5 |
#
|
6 |
# This program is free software; you can redistribute it and/or modify
|
7 |
# it under the terms of the GNU General Public License as published by
|
8 |
# the Free Software Foundation; either version 2 of the License, or
|
9 |
# (at your option) any later version.
|
10 |
#
|
11 |
# This program is distributed in the hope that it will be useful, but
|
12 |
# WITHOUT ANY WARRANTY; without even the implied warranty of
|
13 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
14 |
# General Public License for more details.
|
15 |
#
|
16 |
# You should have received a copy of the GNU General Public License
|
17 |
# along with this program; if not, write to the Free Software
|
18 |
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
|
19 |
# 02110-1301, USA.
|
20 |
|
21 |
|
22 |
"""QA utility functions for testing jobs
|
23 |
|
24 |
"""
|
25 |
|
26 |
import re |
27 |
import threading |
28 |
import time |
29 |
|
30 |
from ganeti import constants |
31 |
from ganeti import locking |
32 |
from ganeti import utils |
33 |
|
34 |
import qa_config |
35 |
import qa_error |
36 |
|
37 |
from qa_utils import AssertCommand, GetCommandOutput, GetObjectInfo |
38 |
|
39 |
|
40 |
AVAILABLE_LOCKS = [locking.LEVEL_NODE, ] |
41 |
|
42 |
|
43 |
def _GetOutputFromMaster(cmd, use_multiplexer=True, log_cmd=True): |
44 |
""" Gets the output of a command executed on master.
|
45 |
|
46 |
"""
|
47 |
if isinstance(cmd, basestring): |
48 |
cmdstr = cmd |
49 |
else:
|
50 |
cmdstr = utils.ShellQuoteArgs(cmd) |
51 |
|
52 |
# Necessary due to the stderr stream not being captured properly on the
|
53 |
# buildbot
|
54 |
cmdstr += " 2>&1"
|
55 |
|
56 |
return GetCommandOutput(qa_config.GetMasterNode().primary, cmdstr,
|
57 |
use_multiplexer=use_multiplexer, log_cmd=log_cmd) |
58 |
|
59 |
|
60 |
def ExecuteJobProducingCommand(cmd): |
61 |
""" Executes a command that contains the --submit flag, and returns a job id.
|
62 |
|
63 |
@type cmd: list of string
|
64 |
@param cmd: The command to execute, broken into constituent components.
|
65 |
|
66 |
"""
|
67 |
job_id_output = _GetOutputFromMaster(cmd) |
68 |
|
69 |
possible_job_ids = re.findall("JobID: ([0-9]+)", job_id_output)
|
70 |
if len(possible_job_ids) != 1: |
71 |
raise qa_error.Error("Cannot parse command output to find job id: output " |
72 |
"is %s" % job_id_output)
|
73 |
|
74 |
return int(possible_job_ids[0]) |
75 |
|
76 |
|
77 |
def _StartDelayFunction(locks, timeout): |
78 |
""" Starts the gnt-debug delay option with the given locks and timeout.
|
79 |
|
80 |
"""
|
81 |
# The interruptible switch must be used
|
82 |
cmd = ["gnt-debug", "delay", "-i", "--submit", "--no-master"] |
83 |
|
84 |
for node in locks.get(locking.LEVEL_NODE, []): |
85 |
cmd.append("-n%s" % node)
|
86 |
|
87 |
cmd.append(str(timeout))
|
88 |
|
89 |
job_id = ExecuteJobProducingCommand(cmd) |
90 |
job_info = GetObjectInfo(["gnt-job", "info", str(job_id)]) |
91 |
execution_logs = job_info[0]["Opcodes"][0]["Execution log"] |
92 |
|
93 |
is_termination_info_fn = \ |
94 |
lambda e: e["Content"][1] == constants.ELOG_DELAY_TEST |
95 |
filtered_logs = filter(is_termination_info_fn, execution_logs)
|
96 |
|
97 |
if len(filtered_logs) != 1: |
98 |
raise qa_error.Error("Failure when trying to retrieve delay termination " |
99 |
"information")
|
100 |
|
101 |
_, _, (socket_path, ) = filtered_logs[0]["Content"] |
102 |
|
103 |
return socket_path
|
104 |
|
105 |
|
106 |
def _TerminateDelayFunction(termination_socket): |
107 |
""" Terminates the delay function by communicating with the domain socket.
|
108 |
|
109 |
"""
|
110 |
AssertCommand("echo a | socat -u stdin UNIX-CLIENT:%s" % termination_socket)
|
111 |
|
112 |
|
113 |
def _GetNodeUUIDMap(nodes): |
114 |
""" Given a list of nodes, retrieves a mapping of their names to UUIDs.
|
115 |
|
116 |
@type nodes: list of string
|
117 |
@param nodes: The nodes to retrieve a map for. If empty, returns information
|
118 |
for all the nodes.
|
119 |
|
120 |
"""
|
121 |
cmd = ["gnt-node", "list", "--no-header", "-o", "name,uuid"] |
122 |
cmd.extend(nodes) |
123 |
output = _GetOutputFromMaster(cmd) |
124 |
return dict(map(lambda x: x.split(), output.splitlines())) |
125 |
|
126 |
|
127 |
def _FindLockNames(locks): |
128 |
""" Finds the ids and descriptions of locks that given locks can block.
|
129 |
|
130 |
@type locks: dict of locking level to list
|
131 |
@param locks: The locks that gnt-debug delay is holding.
|
132 |
|
133 |
@rtype: dict of string to string
|
134 |
@return: The lock name to entity name map.
|
135 |
|
136 |
For a given set of locks, some internal locks (e.g. ALL_SET locks) can be
|
137 |
blocked even though they were not listed explicitly. This function has to take
|
138 |
care and list all locks that can be blocked by the locks given as parameters.
|
139 |
|
140 |
"""
|
141 |
lock_map = {} |
142 |
|
143 |
if locking.LEVEL_NODE in locks: |
144 |
node_locks = locks[locking.LEVEL_NODE] |
145 |
if node_locks == locking.ALL_SET:
|
146 |
# Empty list retrieves all info
|
147 |
name_uuid_map = _GetNodeUUIDMap([]) |
148 |
else:
|
149 |
name_uuid_map = _GetNodeUUIDMap(node_locks) |
150 |
|
151 |
for name in name_uuid_map: |
152 |
lock_map["node/%s" % name_uuid_map[name]] = name
|
153 |
|
154 |
# If ALL_SET was requested explicitly, or there is at least one lock
|
155 |
# Note that locking.ALL_SET is None and hence the strange form of the if
|
156 |
if node_locks == locking.ALL_SET or node_locks: |
157 |
lock_map["node/[lockset]"] = "joint node lock" |
158 |
|
159 |
#TODO add other lock types here when support for these is added
|
160 |
return lock_map
|
161 |
|
162 |
|
163 |
def _GetBlockingLocks(): |
164 |
""" Finds out which locks are blocking jobs by invoking "gnt-debug locks".
|
165 |
|
166 |
@rtype: list of string
|
167 |
@return: The names of the locks currently blocking any job.
|
168 |
|
169 |
"""
|
170 |
# Due to mysterious issues when a SSH multiplexer is being used by two
|
171 |
# threads, we turn it off, and block most of the logging to improve the
|
172 |
# visibility of the other thread's output
|
173 |
locks_output = _GetOutputFromMaster("gnt-debug locks", use_multiplexer=False, |
174 |
log_cmd=False)
|
175 |
|
176 |
# The first non-empty line is the header, which we do not need
|
177 |
lock_lines = locks_output.splitlines()[1:]
|
178 |
|
179 |
blocking_locks = [] |
180 |
for lock_line in lock_lines: |
181 |
components = lock_line.split() |
182 |
if len(components) != 4: |
183 |
raise qa_error.Error("Error while parsing gnt-debug locks output, " |
184 |
"line at fault is: %s" % lock_line)
|
185 |
|
186 |
lock_name, _, _, pending_jobs = components |
187 |
|
188 |
if pending_jobs != '-': |
189 |
blocking_locks.append(lock_name) |
190 |
|
191 |
return blocking_locks
|
192 |
|
193 |
|
194 |
# TODO: Can this be done as a decorator? Implement as needed.
|
195 |
def RunWithLocks(fn, locks, timeout, block, *args, **kwargs): |
196 |
""" Runs the given function, acquiring a set of locks beforehand.
|
197 |
|
198 |
@type fn: function
|
199 |
@param fn: The function to invoke.
|
200 |
@type locks: dict of string to list of string
|
201 |
@param locks: The locks to acquire, per lock category.
|
202 |
@type timeout: number
|
203 |
@param timeout: The number of seconds the locks should be held before
|
204 |
expiring.
|
205 |
@type block: bool
|
206 |
@param block: Whether the test should block when locks are used or not.
|
207 |
|
208 |
This function allows a set of locks to be acquired in preparation for a QA
|
209 |
test, to try and see if the function can run in parallel with other
|
210 |
operations.
|
211 |
|
212 |
Locks are acquired by invoking a gnt-debug delay operation which can be
|
213 |
interrupted as needed. The QA test is then run in a separate thread, with the
|
214 |
current thread observing jobs waiting for locks. When a job is spotted waiting
|
215 |
for a lock held by the started delay operation, this is noted, and the delay
|
216 |
is interrupted, allowing the QA test to continue.
|
217 |
|
218 |
A default timeout is not provided by design - the test creator must make a
|
219 |
good conservative estimate.
|
220 |
|
221 |
"""
|
222 |
if filter(lambda l_type: l_type not in AVAILABLE_LOCKS, locks): |
223 |
raise qa_error.Error("Attempted to acquire locks that cannot yet be " |
224 |
"acquired in the course of a QA test.")
|
225 |
|
226 |
# The watcher may interfere by issuing its own jobs - therefore pause it
|
227 |
AssertCommand(["gnt-cluster", "watcher", "pause", "12h"]) |
228 |
|
229 |
# Find out the lock names prior to starting the delay function
|
230 |
lock_name_map = _FindLockNames(locks) |
231 |
|
232 |
termination_socket = _StartDelayFunction(locks, timeout) |
233 |
|
234 |
qa_thread = threading.Thread(target=fn, args=args, kwargs=kwargs) |
235 |
qa_thread.start() |
236 |
|
237 |
blocking_owned_locks = [] |
238 |
test_blocked = False
|
239 |
|
240 |
try:
|
241 |
while qa_thread.isAlive():
|
242 |
blocking_locks = _GetBlockingLocks() |
243 |
blocking_owned_locks = \ |
244 |
set(blocking_locks).intersection(set(lock_name_map)) |
245 |
|
246 |
if blocking_owned_locks:
|
247 |
test_blocked = True
|
248 |
_TerminateDelayFunction(termination_socket) |
249 |
break
|
250 |
|
251 |
# The sleeping time has been set arbitrarily
|
252 |
time.sleep(5)
|
253 |
except:
|
254 |
# If anything goes wrong here, we should be responsible and terminate the
|
255 |
# delay job
|
256 |
_TerminateDelayFunction(termination_socket) |
257 |
raise
|
258 |
|
259 |
qa_thread.join() |
260 |
|
261 |
blocking_lock_names = ", ".join(map(lock_name_map.get, blocking_owned_locks)) |
262 |
if not block and test_blocked: |
263 |
raise qa_error.Error("QA test succeded, but was blocked by locks: %s" % |
264 |
blocking_lock_names) |
265 |
elif block and not test_blocked: |
266 |
raise qa_error.Error("QA test succeded, but was not blocked as it was " |
267 |
"expected to by locks: %s" % blocking_lock_names)
|
268 |
else:
|
269 |
_TerminateDelayFunction(termination_socket) |
270 |
|
271 |
# Revive the watcher
|
272 |
AssertCommand(["gnt-cluster", "watcher", "continue"]) |