Revision 34c5ec6c

b/qa/qa_job_utils.py
24 24
"""
25 25

  
26 26
import re
27
import threading
28
import time
27 29

  
28 30
from ganeti import constants
29 31
from ganeti import locking
......
38 40
AVAILABLE_LOCKS = [locking.LEVEL_NODE, ]
39 41

  
40 42

  
41
def _GetOutputFromMaster(cmd):
43
def _GetOutputFromMaster(cmd, use_multiplexer=True, log_cmd=True):
42 44
  """ Gets the output of a command executed on master.
43 45

  
44 46
  """
......
51 53
  # buildbot
52 54
  cmdstr += " 2>&1"
53 55

  
54
  return GetCommandOutput(qa_config.GetMasterNode().primary, cmdstr)
56
  return GetCommandOutput(qa_config.GetMasterNode().primary, cmdstr,
57
                          use_multiplexer=use_multiplexer, log_cmd=log_cmd)
55 58

  
56 59

  
57 60
def ExecuteJobProducingCommand(cmd):
......
107 110
  AssertCommand("echo a | socat -u stdin UNIX-CLIENT:%s" % termination_socket)
108 111

  
109 112

  
113
def _GetNodeUUIDMap(nodes):
114
  """ Given a list of nodes, retrieves a mapping of their names to UUIDs.
115

  
116
  @type nodes: list of string
117
  @param nodes: The nodes to retrieve a map for. If empty, returns information
118
                for all the nodes.
119

  
120
  """
121
  cmd = ["gnt-node", "list", "--no-header", "-o", "name,uuid"]
122
  cmd.extend(nodes)
123
  output = _GetOutputFromMaster(cmd)
124
  return dict(map(lambda x: x.split(), output.splitlines()))
125

  
126

  
127
def _FindLockNames(locks):
128
  """ Finds the ids and descriptions of locks that given locks can block.
129

  
130
  @type locks: dict of locking level to list
131
  @param locks: The locks that gnt-debug delay is holding.
132

  
133
  @rtype: dict of string to string
134
  @return: The lock name to entity name map.
135

  
136
  For a given set of locks, some internal locks (e.g. ALL_SET locks) can be
137
  blocked even though they were not listed explicitly. This function has to take
138
  care and list all locks that can be blocked by the locks given as parameters.
139

  
140
  """
141
  lock_map = {}
142

  
143
  if locking.LEVEL_NODE in locks:
144
    node_locks = locks[locking.LEVEL_NODE]
145
    if node_locks == locking.ALL_SET:
146
      # Empty list retrieves all info
147
      name_uuid_map = _GetNodeUUIDMap([])
148
    else:
149
      name_uuid_map = _GetNodeUUIDMap(node_locks)
150

  
151
    for name in name_uuid_map:
152
      lock_map["node/%s" % name_uuid_map[name]] = name
153

  
154
    # If ALL_SET was requested explicitly, or there is at least one lock
155
    # Note that locking.ALL_SET is None and hence the strange form of the if
156
    if node_locks == locking.ALL_SET or node_locks:
157
      lock_map["node/[lockset]"] = "joint node lock"
158

  
159
  #TODO add other lock types here when support for these is added
160
  return lock_map
161

  
162

  
163
def _GetBlockingLocks():
164
  """ Finds out which locks are blocking jobs by invoking "gnt-debug locks".
165

  
166
  @rtype: list of string
167
  @return: The names of the locks currently blocking any job.
168

  
169
  """
170
  # Due to mysterious issues when a SSH multiplexer is being used by two
171
  # threads, we turn it off, and block most of the logging to improve the
172
  # visibility of the other thread's output
173
  locks_output = _GetOutputFromMaster("gnt-debug locks", use_multiplexer=False,
174
                                      log_cmd=False)
175

  
176
  # The first non-empty line is the header, which we do not need
177
  lock_lines = locks_output.splitlines()[1:]
178

  
179
  blocking_locks = []
180
  for lock_line in lock_lines:
181
    components = lock_line.split()
182
    if len(components) != 4:
183
      raise qa_error.Error("Error while parsing gnt-debug locks output, "
184
                           "line at fault is: %s" % lock_line)
185

  
186
    lock_name, _, _, pending_jobs = components
187

  
188
    if pending_jobs != '-':
189
      blocking_locks.append(lock_name)
190

  
191
  return blocking_locks
192

  
193

  
110 194
# TODO: Can this be done as a decorator? Implement as needed.
111 195
def RunWithLocks(fn, locks, timeout, *args, **kwargs):
112 196
  """ Runs the given function, acquiring a set of locks beforehand.
......
123 207
  test, to try and see if the function can run in parallel with other
124 208
  operations.
125 209

  
126
  The current version simply creates the locks, which expire after a given
127
  timeout, and attempts to invoke the provided function.
128

  
129
  This will probably block the QA, and future versions will address this.
210
  Locks are acquired by invoking a gnt-debug delay operation which can be
211
  interrupted as needed. The QA test is then run in a separate thread, with the
212
  current thread observing jobs waiting for locks. When a job is spotted waiting
213
  for a lock held by the started delay operation, this is noted, and the delay
214
  is interrupted, allowing the QA test to continue.
130 215

  
131 216
  A default timeout is not provided by design - the test creator must make a
132 217
  good conservative estimate.
......
139 224
  # The watcher may interfere by issuing its own jobs - therefore pause it
140 225
  AssertCommand(["gnt-cluster", "watcher", "pause", "12h"])
141 226

  
142
  termination_socket = _StartDelayFunction(locks, timeout)
227
  # Find out the lock names prior to starting the delay function
228
  lock_name_map = _FindLockNames(locks)
143 229

  
144
  fn(*args, **kwargs)
230
  termination_socket = _StartDelayFunction(locks, timeout)
145 231

  
146
  _TerminateDelayFunction(termination_socket)
232
  qa_thread = threading.Thread(target=fn, args=args, kwargs=kwargs)
233
  qa_thread.start()
234

  
235
  blocking_owned_locks = []
236
  test_blocked = False
237

  
238
  try:
239
    while qa_thread.isAlive():
240
      blocking_locks = _GetBlockingLocks()
241
      blocking_owned_locks = \
242
        set(blocking_locks).intersection(set(lock_name_map))
243

  
244
      if blocking_owned_locks:
245
        test_blocked = True
246
        _TerminateDelayFunction(termination_socket)
247
        break
248

  
249
      # The sleeping time has been set arbitrarily
250
      time.sleep(5)
251
  except:
252
    # If anything goes wrong here, we should be responsible and terminate the
253
    # delay job
254
    _TerminateDelayFunction(termination_socket)
255
    raise
256

  
257
  qa_thread.join()
258

  
259
  if test_blocked:
260
    blocking_lock_names = map(lock_name_map.get, blocking_owned_locks)
261
    raise qa_error.Error("QA test succeded, but was blocked by the locks: %s" %
262
                         ", ".join(blocking_lock_names))
263
  else:
264
    _TerminateDelayFunction(termination_socket)
147 265

  
148 266
  # Revive the watcher
149 267
  AssertCommand(["gnt-cluster", "watcher", "continue"])

Also available in: Unified diff