Statistics
| Branch: | Tag: | Revision:

root / qa / qa_job_utils.py @ 1d523139

History | View | Annotate | Download (10.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2014 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""QA utility functions for testing jobs
23

24
"""
25

    
26
import re
27
import sys
28
import threading
29
import time
30

    
31
from ganeti import constants
32
from ganeti import locking
33
from ganeti import utils
34
from ganeti.utils import retry
35

    
36
import qa_config
37
import qa_error
38

    
39
from qa_utils import AssertCommand, GetCommandOutput, GetObjectInfo
40

    
41

    
42
AVAILABLE_LOCKS = [locking.LEVEL_NODE, ]
43

    
44

    
45
def _GetOutputFromMaster(cmd, use_multiplexer=True, log_cmd=True):
46
  """ Gets the output of a command executed on master.
47

48
  """
49
  if isinstance(cmd, basestring):
50
    cmdstr = cmd
51
  else:
52
    cmdstr = utils.ShellQuoteArgs(cmd)
53

    
54
  # Necessary due to the stderr stream not being captured properly on the
55
  # buildbot
56
  cmdstr += " 2>&1"
57

    
58
  return GetCommandOutput(qa_config.GetMasterNode().primary, cmdstr,
59
                          use_multiplexer=use_multiplexer, log_cmd=log_cmd)
60

    
61

    
62
def ExecuteJobProducingCommand(cmd):
63
  """ Executes a command that contains the --submit flag, and returns a job id.
64

65
  @type cmd: list of string
66
  @param cmd: The command to execute, broken into constituent components.
67

68
  """
69
  job_id_output = _GetOutputFromMaster(cmd)
70

    
71
  possible_job_ids = re.findall("JobID: ([0-9]+)", job_id_output)
72
  if len(possible_job_ids) != 1:
73
    raise qa_error.Error("Cannot parse command output to find job id: output "
74
                         "is %s" % job_id_output)
75

    
76
  return int(possible_job_ids[0])
77

    
78

    
79
def _RetrieveTerminationInfo(job_id):
80
  """ Retrieves the termination info from a job caused by gnt-debug delay.
81

82
  @rtype: dict or None
83
  @return: The termination log entry, or None if no entry was found
84

85
  """
86
  job_info = GetObjectInfo(["gnt-job", "info", str(job_id)])
87

    
88
  opcodes = job_info[0]["Opcodes"]
89
  if not opcodes:
90
    raise qa_error.Error("Cannot retrieve a list of opcodes")
91

    
92
  execution_logs = opcodes[0]["Execution log"]
93
  if not execution_logs:
94
    return None
95

    
96
  is_termination_info_fn = \
97
    lambda e: e["Content"][1] == constants.ELOG_DELAY_TEST
98

    
99
  filtered_logs = filter(is_termination_info_fn, execution_logs)
100

    
101
  no_logs = len(filtered_logs)
102
  if no_logs > 1:
103
    raise qa_error.Error("Too many interruption information entries found!")
104
  elif no_logs == 1:
105
    return filtered_logs[0]
106
  else:
107
    return None
108

    
109

    
110
def _StartDelayFunction(locks, timeout):
111
  """ Starts the gnt-debug delay option with the given locks and timeout.
112

113
  """
114
  # The interruptible switch must be used
115
  cmd = ["gnt-debug", "delay", "-i", "--submit", "--no-master"]
116

    
117
  for node in locks.get(locking.LEVEL_NODE, []):
118
    cmd.append("-n%s" % node)
119
  cmd.append(str(timeout))
120

    
121
  job_id = ExecuteJobProducingCommand(cmd)
122

    
123
  # Waits until a non-empty result is returned from the function
124
  log_entry = retry.SimpleRetry(lambda x: x, _RetrieveTerminationInfo, 2.0,
125
                                10.0, args=[job_id])
126

    
127
  if not log_entry:
128
    raise qa_error.Error("Failure when trying to retrieve delay termination "
129
                         "information")
130

    
131
  _, _, (socket_path, ) = log_entry["Content"]
132

    
133
  return socket_path
134

    
135

    
136
def _TerminateDelayFunction(termination_socket):
137
  """ Terminates the delay function by communicating with the domain socket.
138

139
  """
140
  AssertCommand("echo a | socat -u stdin UNIX-CLIENT:%s" % termination_socket)
141

    
142

    
143
def _GetNodeUUIDMap(nodes):
144
  """ Given a list of nodes, retrieves a mapping of their names to UUIDs.
145

146
  @type nodes: list of string
147
  @param nodes: The nodes to retrieve a map for. If empty, returns information
148
                for all the nodes.
149

150
  """
151
  cmd = ["gnt-node", "list", "--no-header", "-o", "name,uuid"]
152
  cmd.extend(nodes)
153
  output = _GetOutputFromMaster(cmd)
154
  return dict(map(lambda x: x.split(), output.splitlines()))
155

    
156

    
157
def _FindLockNames(locks):
158
  """ Finds the ids and descriptions of locks that given locks can block.
159

160
  @type locks: dict of locking level to list
161
  @param locks: The locks that gnt-debug delay is holding.
162

163
  @rtype: dict of string to string
164
  @return: The lock name to entity name map.
165

166
  For a given set of locks, some internal locks (e.g. ALL_SET locks) can be
167
  blocked even though they were not listed explicitly. This function has to take
168
  care and list all locks that can be blocked by the locks given as parameters.
169

170
  """
171
  lock_map = {}
172

    
173
  if locking.LEVEL_NODE in locks:
174
    node_locks = locks[locking.LEVEL_NODE]
175
    if node_locks == locking.ALL_SET:
176
      # Empty list retrieves all info
177
      name_uuid_map = _GetNodeUUIDMap([])
178
    else:
179
      name_uuid_map = _GetNodeUUIDMap(node_locks)
180

    
181
    for name in name_uuid_map:
182
      lock_map["node/%s" % name_uuid_map[name]] = name
183

    
184
    # If ALL_SET was requested explicitly, or there is at least one lock
185
    # Note that locking.ALL_SET is None and hence the strange form of the if
186
    if node_locks == locking.ALL_SET or node_locks:
187
      lock_map["node/[lockset]"] = "joint node lock"
188

    
189
  #TODO add other lock types here when support for these is added
190
  return lock_map
191

    
192

    
193
def _GetBlockingLocks():
194
  """ Finds out which locks are blocking jobs by invoking "gnt-debug locks".
195

196
  @rtype: list of string
197
  @return: The names of the locks currently blocking any job.
198

199
  """
200
  # Due to mysterious issues when a SSH multiplexer is being used by two
201
  # threads, we turn it off, and block most of the logging to improve the
202
  # visibility of the other thread's output
203
  locks_output = _GetOutputFromMaster("gnt-debug locks", use_multiplexer=False,
204
                                      log_cmd=False)
205

    
206
  # The first non-empty line is the header, which we do not need
207
  lock_lines = locks_output.splitlines()[1:]
208

    
209
  blocking_locks = []
210
  for lock_line in lock_lines:
211
    components = lock_line.split()
212
    if len(components) != 4:
213
      raise qa_error.Error("Error while parsing gnt-debug locks output, "
214
                           "line at fault is: %s" % lock_line)
215

    
216
    lock_name, _, _, pending_jobs = components
217

    
218
    if pending_jobs != '-':
219
      blocking_locks.append(lock_name)
220

    
221
  return blocking_locks
222

    
223

    
224
class QAThread(threading.Thread):
225
  """ An exception-preserving thread that executes a given function.
226

227
  """
228
  def __init__(self, fn, args, kwargs):
229
    """ Constructor accepting the function to be invoked later.
230

231
    """
232
    threading.Thread.__init__(self)
233
    self._fn = fn
234
    self._args = args
235
    self._kwargs = kwargs
236
    self._exc_info = None
237

    
238
  def run(self):
239
    """ Executes the function, preserving exception info if necessary.
240

241
    """
242
    # pylint: disable=W0702
243
    # We explicitly want to catch absolutely anything
244
    try:
245
      self._fn(*self._args, **self._kwargs)
246
    except:
247
      self._exc_info = sys.exc_info()
248
    # pylint: enable=W0702
249

    
250
  def reraise(self):
251
    """ Reraises any exceptions that might have occured during thread execution.
252

253
    """
254
    if self._exc_info is not None:
255
      raise self._exc_info[0], self._exc_info[1], self._exc_info[2]
256

    
257

    
258
# TODO: Can this be done as a decorator? Implement as needed.
259
def RunWithLocks(fn, locks, timeout, block, *args, **kwargs):
260
  """ Runs the given function, acquiring a set of locks beforehand.
261

262
  @type fn: function
263
  @param fn: The function to invoke.
264
  @type locks: dict of string to list of string
265
  @param locks: The locks to acquire, per lock category.
266
  @type timeout: number
267
  @param timeout: The number of seconds the locks should be held before
268
                  expiring.
269
  @type block: bool
270
  @param block: Whether the test should block when locks are used or not.
271

272
  This function allows a set of locks to be acquired in preparation for a QA
273
  test, to try and see if the function can run in parallel with other
274
  operations.
275

276
  Locks are acquired by invoking a gnt-debug delay operation which can be
277
  interrupted as needed. The QA test is then run in a separate thread, with the
278
  current thread observing jobs waiting for locks. When a job is spotted waiting
279
  for a lock held by the started delay operation, this is noted, and the delay
280
  is interrupted, allowing the QA test to continue.
281

282
  A default timeout is not provided by design - the test creator must make a
283
  good conservative estimate.
284

285
  """
286
  if filter(lambda l_type: l_type not in AVAILABLE_LOCKS, locks):
287
    raise qa_error.Error("Attempted to acquire locks that cannot yet be "
288
                         "acquired in the course of a QA test.")
289

    
290
  # The watcher may interfere by issuing its own jobs - therefore pause it
291
  AssertCommand(["gnt-cluster", "watcher", "pause", "12h"])
292

    
293
  # Find out the lock names prior to starting the delay function
294
  lock_name_map = _FindLockNames(locks)
295

    
296
  termination_socket = _StartDelayFunction(locks, timeout)
297

    
298
  qa_thread = threading.Thread(target=fn, args=args, kwargs=kwargs)
299
  qa_thread.start()
300

    
301
  blocking_owned_locks = []
302
  test_blocked = False
303

    
304
  try:
305
    while qa_thread.isAlive():
306
      blocking_locks = _GetBlockingLocks()
307
      blocking_owned_locks = \
308
        set(blocking_locks).intersection(set(lock_name_map))
309

    
310
      if blocking_owned_locks:
311
        test_blocked = True
312
        _TerminateDelayFunction(termination_socket)
313
        break
314

    
315
      # The sleeping time has been set arbitrarily
316
      time.sleep(5)
317
  except:
318
    # If anything goes wrong here, we should be responsible and terminate the
319
    # delay job
320
    _TerminateDelayFunction(termination_socket)
321
    raise
322

    
323
  qa_thread.join()
324

    
325
  blocking_lock_names = ", ".join(map(lock_name_map.get, blocking_owned_locks))
326
  if not block and test_blocked:
327
    raise qa_error.Error("QA test succeded, but was blocked by locks: %s" %
328
                         blocking_lock_names)
329
  elif block and not test_blocked:
330
    raise qa_error.Error("QA test succeded, but was not blocked as it was "
331
                         "expected to by locks: %s" % blocking_lock_names)
332
  else:
333
    _TerminateDelayFunction(termination_socket)
334

    
335
  # Revive the watcher
336
  AssertCommand(["gnt-cluster", "watcher", "continue"])