Statistics
| Branch: | Tag: | Revision:

root / qa / qa_job_utils.py @ 57efdaf5

History | View | Annotate | Download (8.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2014 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""QA utility functions for testing jobs
23

24
"""
25

    
26
import re
27
import threading
28
import time
29

    
30
from ganeti import constants
31
from ganeti import locking
32
from ganeti import utils
33

    
34
import qa_config
35
import qa_error
36

    
37
from qa_utils import AssertCommand, GetCommandOutput, GetObjectInfo
38

    
39

    
40
AVAILABLE_LOCKS = [locking.LEVEL_NODE, ]
41

    
42

    
43
def _GetOutputFromMaster(cmd, use_multiplexer=True, log_cmd=True):
44
  """ Gets the output of a command executed on master.
45

46
  """
47
  if isinstance(cmd, basestring):
48
    cmdstr = cmd
49
  else:
50
    cmdstr = utils.ShellQuoteArgs(cmd)
51

    
52
  # Necessary due to the stderr stream not being captured properly on the
53
  # buildbot
54
  cmdstr += " 2>&1"
55

    
56
  return GetCommandOutput(qa_config.GetMasterNode().primary, cmdstr,
57
                          use_multiplexer=use_multiplexer, log_cmd=log_cmd)
58

    
59

    
60
def ExecuteJobProducingCommand(cmd):
61
  """ Executes a command that contains the --submit flag, and returns a job id.
62

63
  @type cmd: list of string
64
  @param cmd: The command to execute, broken into constituent components.
65

66
  """
67
  job_id_output = _GetOutputFromMaster(cmd)
68

    
69
  possible_job_ids = re.findall("JobID: ([0-9]+)", job_id_output)
70
  if len(possible_job_ids) != 1:
71
    raise qa_error.Error("Cannot parse command output to find job id: output "
72
                         "is %s" % job_id_output)
73

    
74
  return int(possible_job_ids[0])
75

    
76

    
77
def _StartDelayFunction(locks, timeout):
78
  """ Starts the gnt-debug delay option with the given locks and timeout.
79

80
  """
81
  # The interruptible switch must be used
82
  cmd = ["gnt-debug", "delay", "-i", "--submit", "--no-master"]
83

    
84
  for node in locks.get(locking.LEVEL_NODE, []):
85
    cmd.append("-n%s" % node)
86

    
87
  cmd.append(str(timeout))
88

    
89
  job_id = ExecuteJobProducingCommand(cmd)
90
  job_info = GetObjectInfo(["gnt-job", "info", str(job_id)])
91
  execution_logs = job_info[0]["Opcodes"][0]["Execution log"]
92

    
93
  is_termination_info_fn = \
94
    lambda e: e["Content"][1] == constants.ELOG_DELAY_TEST
95
  filtered_logs = filter(is_termination_info_fn, execution_logs)
96

    
97
  if len(filtered_logs) != 1:
98
    raise qa_error.Error("Failure when trying to retrieve delay termination "
99
                         "information")
100

    
101
  _, _, (socket_path, ) = filtered_logs[0]["Content"]
102

    
103
  return socket_path
104

    
105

    
106
def _TerminateDelayFunction(termination_socket):
107
  """ Terminates the delay function by communicating with the domain socket.
108

109
  """
110
  AssertCommand("echo a | socat -u stdin UNIX-CLIENT:%s" % termination_socket)
111

    
112

    
113
def _GetNodeUUIDMap(nodes):
114
  """ Given a list of nodes, retrieves a mapping of their names to UUIDs.
115

116
  @type nodes: list of string
117
  @param nodes: The nodes to retrieve a map for. If empty, returns information
118
                for all the nodes.
119

120
  """
121
  cmd = ["gnt-node", "list", "--no-header", "-o", "name,uuid"]
122
  cmd.extend(nodes)
123
  output = _GetOutputFromMaster(cmd)
124
  return dict(map(lambda x: x.split(), output.splitlines()))
125

    
126

    
127
def _FindLockNames(locks):
128
  """ Finds the ids and descriptions of locks that given locks can block.
129

130
  @type locks: dict of locking level to list
131
  @param locks: The locks that gnt-debug delay is holding.
132

133
  @rtype: dict of string to string
134
  @return: The lock name to entity name map.
135

136
  For a given set of locks, some internal locks (e.g. ALL_SET locks) can be
137
  blocked even though they were not listed explicitly. This function has to take
138
  care and list all locks that can be blocked by the locks given as parameters.
139

140
  """
141
  lock_map = {}
142

    
143
  if locking.LEVEL_NODE in locks:
144
    node_locks = locks[locking.LEVEL_NODE]
145
    if node_locks == locking.ALL_SET:
146
      # Empty list retrieves all info
147
      name_uuid_map = _GetNodeUUIDMap([])
148
    else:
149
      name_uuid_map = _GetNodeUUIDMap(node_locks)
150

    
151
    for name in name_uuid_map:
152
      lock_map["node/%s" % name_uuid_map[name]] = name
153

    
154
    # If ALL_SET was requested explicitly, or there is at least one lock
155
    # Note that locking.ALL_SET is None and hence the strange form of the if
156
    if node_locks == locking.ALL_SET or node_locks:
157
      lock_map["node/[lockset]"] = "joint node lock"
158

    
159
  #TODO add other lock types here when support for these is added
160
  return lock_map
161

    
162

    
163
def _GetBlockingLocks():
164
  """ Finds out which locks are blocking jobs by invoking "gnt-debug locks".
165

166
  @rtype: list of string
167
  @return: The names of the locks currently blocking any job.
168

169
  """
170
  # Due to mysterious issues when a SSH multiplexer is being used by two
171
  # threads, we turn it off, and block most of the logging to improve the
172
  # visibility of the other thread's output
173
  locks_output = _GetOutputFromMaster("gnt-debug locks", use_multiplexer=False,
174
                                      log_cmd=False)
175

    
176
  # The first non-empty line is the header, which we do not need
177
  lock_lines = locks_output.splitlines()[1:]
178

    
179
  blocking_locks = []
180
  for lock_line in lock_lines:
181
    components = lock_line.split()
182
    if len(components) != 4:
183
      raise qa_error.Error("Error while parsing gnt-debug locks output, "
184
                           "line at fault is: %s" % lock_line)
185

    
186
    lock_name, _, _, pending_jobs = components
187

    
188
    if pending_jobs != '-':
189
      blocking_locks.append(lock_name)
190

    
191
  return blocking_locks
192

    
193

    
194
# TODO: Can this be done as a decorator? Implement as needed.
195
def RunWithLocks(fn, locks, timeout, block, *args, **kwargs):
196
  """ Runs the given function, acquiring a set of locks beforehand.
197

198
  @type fn: function
199
  @param fn: The function to invoke.
200
  @type locks: dict of string to list of string
201
  @param locks: The locks to acquire, per lock category.
202
  @type timeout: number
203
  @param timeout: The number of seconds the locks should be held before
204
                  expiring.
205
  @type block: bool
206
  @param block: Whether the test should block when locks are used or not.
207

208
  This function allows a set of locks to be acquired in preparation for a QA
209
  test, to try and see if the function can run in parallel with other
210
  operations.
211

212
  Locks are acquired by invoking a gnt-debug delay operation which can be
213
  interrupted as needed. The QA test is then run in a separate thread, with the
214
  current thread observing jobs waiting for locks. When a job is spotted waiting
215
  for a lock held by the started delay operation, this is noted, and the delay
216
  is interrupted, allowing the QA test to continue.
217

218
  A default timeout is not provided by design - the test creator must make a
219
  good conservative estimate.
220

221
  """
222
  if filter(lambda l_type: l_type not in AVAILABLE_LOCKS, locks):
223
    raise qa_error.Error("Attempted to acquire locks that cannot yet be "
224
                         "acquired in the course of a QA test.")
225

    
226
  # The watcher may interfere by issuing its own jobs - therefore pause it
227
  AssertCommand(["gnt-cluster", "watcher", "pause", "12h"])
228

    
229
  # Find out the lock names prior to starting the delay function
230
  lock_name_map = _FindLockNames(locks)
231

    
232
  termination_socket = _StartDelayFunction(locks, timeout)
233

    
234
  qa_thread = threading.Thread(target=fn, args=args, kwargs=kwargs)
235
  qa_thread.start()
236

    
237
  blocking_owned_locks = []
238
  test_blocked = False
239

    
240
  try:
241
    while qa_thread.isAlive():
242
      blocking_locks = _GetBlockingLocks()
243
      blocking_owned_locks = \
244
        set(blocking_locks).intersection(set(lock_name_map))
245

    
246
      if blocking_owned_locks:
247
        test_blocked = True
248
        _TerminateDelayFunction(termination_socket)
249
        break
250

    
251
      # The sleeping time has been set arbitrarily
252
      time.sleep(5)
253
  except:
254
    # If anything goes wrong here, we should be responsible and terminate the
255
    # delay job
256
    _TerminateDelayFunction(termination_socket)
257
    raise
258

    
259
  qa_thread.join()
260

    
261
  blocking_lock_names = ", ".join(map(lock_name_map.get, blocking_owned_locks))
262
  if not block and test_blocked:
263
    raise qa_error.Error("QA test succeded, but was blocked by locks: %s" %
264
                         blocking_lock_names)
265
  elif block and not test_blocked:
266
    raise qa_error.Error("QA test succeded, but was not blocked as it was "
267
                         "expected to by locks: %s" % blocking_lock_names)
268
  else:
269
    _TerminateDelayFunction(termination_socket)
270

    
271
  # Revive the watcher
272
  AssertCommand(["gnt-cluster", "watcher", "continue"])