Statistics
| Branch: | Tag: | Revision:

root / qa / qa_job_utils.py @ fbab1c76

History | View | Annotate | Download (9.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2014 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""QA utility functions for testing jobs
23

24
"""
25

    
26
import re
27
import threading
28
import time
29

    
30
from ganeti import constants
31
from ganeti import locking
32
from ganeti import utils
33
from ganeti.utils import retry
34

    
35
import qa_config
36
import qa_error
37

    
38
from qa_utils import AssertCommand, GetCommandOutput, GetObjectInfo
39

    
40

    
41
AVAILABLE_LOCKS = [locking.LEVEL_NODE, ]
42

    
43

    
44
def _GetOutputFromMaster(cmd, use_multiplexer=True, log_cmd=True):
45
  """ Gets the output of a command executed on master.
46

47
  """
48
  if isinstance(cmd, basestring):
49
    cmdstr = cmd
50
  else:
51
    cmdstr = utils.ShellQuoteArgs(cmd)
52

    
53
  # Necessary due to the stderr stream not being captured properly on the
54
  # buildbot
55
  cmdstr += " 2>&1"
56

    
57
  return GetCommandOutput(qa_config.GetMasterNode().primary, cmdstr,
58
                          use_multiplexer=use_multiplexer, log_cmd=log_cmd)
59

    
60

    
61
def ExecuteJobProducingCommand(cmd):
62
  """ Executes a command that contains the --submit flag, and returns a job id.
63

64
  @type cmd: list of string
65
  @param cmd: The command to execute, broken into constituent components.
66

67
  """
68
  job_id_output = _GetOutputFromMaster(cmd)
69

    
70
  possible_job_ids = re.findall("JobID: ([0-9]+)", job_id_output)
71
  if len(possible_job_ids) != 1:
72
    raise qa_error.Error("Cannot parse command output to find job id: output "
73
                         "is %s" % job_id_output)
74

    
75
  return int(possible_job_ids[0])
76

    
77

    
78
def _RetrieveTerminationInfo(job_id):
79
  """ Retrieves the termination info from a job caused by gnt-debug delay.
80

81
  @rtype: dict or None
82
  @return: The termination log entry, or None if no entry was found
83

84
  """
85
  job_info = GetObjectInfo(["gnt-job", "info", str(job_id)])
86

    
87
  opcodes = job_info[0]["Opcodes"]
88
  if not opcodes:
89
    raise qa_error.Error("Cannot retrieve a list of opcodes")
90

    
91
  execution_logs = opcodes[0]["Execution log"]
92
  if not execution_logs:
93
    return None
94

    
95
  is_termination_info_fn = \
96
    lambda e: e["Content"][1] == constants.ELOG_DELAY_TEST
97

    
98
  filtered_logs = filter(is_termination_info_fn, execution_logs)
99

    
100
  no_logs = len(filtered_logs)
101
  if no_logs > 1:
102
    raise qa_error.Error("Too many interruption information entries found!")
103
  elif no_logs == 1:
104
    return filtered_logs[0]
105
  else:
106
    return None
107

    
108

    
109
def _StartDelayFunction(locks, timeout):
110
  """ Starts the gnt-debug delay option with the given locks and timeout.
111

112
  """
113
  # The interruptible switch must be used
114
  cmd = ["gnt-debug", "delay", "-i", "--submit", "--no-master"]
115

    
116
  for node in locks.get(locking.LEVEL_NODE, []):
117
    cmd.append("-n%s" % node)
118
  cmd.append(str(timeout))
119

    
120
  job_id = ExecuteJobProducingCommand(cmd)
121

    
122
  # Waits until a non-empty result is returned from the function
123
  log_entry = retry.SimpleRetry(lambda x: x, _RetrieveTerminationInfo, 2.0,
124
                                10.0, args=[job_id])
125

    
126
  if not log_entry:
127
    raise qa_error.Error("Failure when trying to retrieve delay termination "
128
                         "information")
129

    
130
  _, _, (socket_path, ) = log_entry["Content"]
131

    
132
  return socket_path
133

    
134

    
135
def _TerminateDelayFunction(termination_socket):
136
  """ Terminates the delay function by communicating with the domain socket.
137

138
  """
139
  AssertCommand("echo a | socat -u stdin UNIX-CLIENT:%s" % termination_socket)
140

    
141

    
142
def _GetNodeUUIDMap(nodes):
143
  """ Given a list of nodes, retrieves a mapping of their names to UUIDs.
144

145
  @type nodes: list of string
146
  @param nodes: The nodes to retrieve a map for. If empty, returns information
147
                for all the nodes.
148

149
  """
150
  cmd = ["gnt-node", "list", "--no-header", "-o", "name,uuid"]
151
  cmd.extend(nodes)
152
  output = _GetOutputFromMaster(cmd)
153
  return dict(map(lambda x: x.split(), output.splitlines()))
154

    
155

    
156
def _FindLockNames(locks):
157
  """ Finds the ids and descriptions of locks that given locks can block.
158

159
  @type locks: dict of locking level to list
160
  @param locks: The locks that gnt-debug delay is holding.
161

162
  @rtype: dict of string to string
163
  @return: The lock name to entity name map.
164

165
  For a given set of locks, some internal locks (e.g. ALL_SET locks) can be
166
  blocked even though they were not listed explicitly. This function has to take
167
  care and list all locks that can be blocked by the locks given as parameters.
168

169
  """
170
  lock_map = {}
171

    
172
  if locking.LEVEL_NODE in locks:
173
    node_locks = locks[locking.LEVEL_NODE]
174
    if node_locks == locking.ALL_SET:
175
      # Empty list retrieves all info
176
      name_uuid_map = _GetNodeUUIDMap([])
177
    else:
178
      name_uuid_map = _GetNodeUUIDMap(node_locks)
179

    
180
    for name in name_uuid_map:
181
      lock_map["node/%s" % name_uuid_map[name]] = name
182

    
183
    # If ALL_SET was requested explicitly, or there is at least one lock
184
    # Note that locking.ALL_SET is None and hence the strange form of the if
185
    if node_locks == locking.ALL_SET or node_locks:
186
      lock_map["node/[lockset]"] = "joint node lock"
187

    
188
  #TODO add other lock types here when support for these is added
189
  return lock_map
190

    
191

    
192
def _GetBlockingLocks():
193
  """ Finds out which locks are blocking jobs by invoking "gnt-debug locks".
194

195
  @rtype: list of string
196
  @return: The names of the locks currently blocking any job.
197

198
  """
199
  # Due to mysterious issues when a SSH multiplexer is being used by two
200
  # threads, we turn it off, and block most of the logging to improve the
201
  # visibility of the other thread's output
202
  locks_output = _GetOutputFromMaster("gnt-debug locks", use_multiplexer=False,
203
                                      log_cmd=False)
204

    
205
  # The first non-empty line is the header, which we do not need
206
  lock_lines = locks_output.splitlines()[1:]
207

    
208
  blocking_locks = []
209
  for lock_line in lock_lines:
210
    components = lock_line.split()
211
    if len(components) != 4:
212
      raise qa_error.Error("Error while parsing gnt-debug locks output, "
213
                           "line at fault is: %s" % lock_line)
214

    
215
    lock_name, _, _, pending_jobs = components
216

    
217
    if pending_jobs != '-':
218
      blocking_locks.append(lock_name)
219

    
220
  return blocking_locks
221

    
222

    
223
# TODO: Can this be done as a decorator? Implement as needed.
224
def RunWithLocks(fn, locks, timeout, block, *args, **kwargs):
225
  """ Runs the given function, acquiring a set of locks beforehand.
226

227
  @type fn: function
228
  @param fn: The function to invoke.
229
  @type locks: dict of string to list of string
230
  @param locks: The locks to acquire, per lock category.
231
  @type timeout: number
232
  @param timeout: The number of seconds the locks should be held before
233
                  expiring.
234
  @type block: bool
235
  @param block: Whether the test should block when locks are used or not.
236

237
  This function allows a set of locks to be acquired in preparation for a QA
238
  test, to try and see if the function can run in parallel with other
239
  operations.
240

241
  Locks are acquired by invoking a gnt-debug delay operation which can be
242
  interrupted as needed. The QA test is then run in a separate thread, with the
243
  current thread observing jobs waiting for locks. When a job is spotted waiting
244
  for a lock held by the started delay operation, this is noted, and the delay
245
  is interrupted, allowing the QA test to continue.
246

247
  A default timeout is not provided by design - the test creator must make a
248
  good conservative estimate.
249

250
  """
251
  if filter(lambda l_type: l_type not in AVAILABLE_LOCKS, locks):
252
    raise qa_error.Error("Attempted to acquire locks that cannot yet be "
253
                         "acquired in the course of a QA test.")
254

    
255
  # The watcher may interfere by issuing its own jobs - therefore pause it
256
  AssertCommand(["gnt-cluster", "watcher", "pause", "12h"])
257

    
258
  # Find out the lock names prior to starting the delay function
259
  lock_name_map = _FindLockNames(locks)
260

    
261
  termination_socket = _StartDelayFunction(locks, timeout)
262

    
263
  qa_thread = threading.Thread(target=fn, args=args, kwargs=kwargs)
264
  qa_thread.start()
265

    
266
  blocking_owned_locks = []
267
  test_blocked = False
268

    
269
  try:
270
    while qa_thread.isAlive():
271
      blocking_locks = _GetBlockingLocks()
272
      blocking_owned_locks = \
273
        set(blocking_locks).intersection(set(lock_name_map))
274

    
275
      if blocking_owned_locks:
276
        test_blocked = True
277
        _TerminateDelayFunction(termination_socket)
278
        break
279

    
280
      # The sleeping time has been set arbitrarily
281
      time.sleep(5)
282
  except:
283
    # If anything goes wrong here, we should be responsible and terminate the
284
    # delay job
285
    _TerminateDelayFunction(termination_socket)
286
    raise
287

    
288
  qa_thread.join()
289

    
290
  blocking_lock_names = ", ".join(map(lock_name_map.get, blocking_owned_locks))
291
  if not block and test_blocked:
292
    raise qa_error.Error("QA test succeded, but was blocked by locks: %s" %
293
                         blocking_lock_names)
294
  elif block and not test_blocked:
295
    raise qa_error.Error("QA test succeded, but was not blocked as it was "
296
                         "expected to by locks: %s" % blocking_lock_names)
297
  else:
298
    _TerminateDelayFunction(termination_socket)
299

    
300
  # Revive the watcher
301
  AssertCommand(["gnt-cluster", "watcher", "continue"])