Revision 407339d0

b/Makefile.am
272 272
	test/ganeti.hooks_unittest.py \
273 273
	test/ganeti.http_unittest.py \
274 274
	test/ganeti.locking_unittest.py \
275
	test/ganeti.mcpu_unittest.py \
275 276
	test/ganeti.objects_unittest.py \
276 277
	test/ganeti.rapi.resources_unittest.py \
277 278
	test/ganeti.serializer_unittest.py \
b/lib/mcpu.py
29 29
"""
30 30

  
31 31
import logging
32
import random
33
import time
32 34

  
33 35
from ganeti import opcodes
34 36
from ganeti import constants
......
39 41
from ganeti import utils
40 42

  
41 43

  
44
class _LockAcquireTimeout(Exception):
45
  """Internal exception to report timeouts on acquiring locks.
46

  
47
  """
48

  
49

  
50
class _LockTimeoutStrategy(object):
51
  """Class with lock acquire timeout strategy.
52

  
53
  """
54
  __slots__ = [
55
    "_attempts",
56
    "_random_fn",
57
    "_start_time",
58
    ]
59

  
60
  _MAX_ATTEMPTS = 10
61
  """How many retries before going into blocking mode"""
62

  
63
  _ATTEMPT_FACTOR = 1.75
64
  """Factor between attempts"""
65

  
66
  def __init__(self, _random_fn=None):
67
    """Initializes this class.
68

  
69
    @param _random_fn: Random number generator for unittests
70

  
71
    """
72
    object.__init__(self)
73

  
74
    self._start_time = None
75
    self._attempts = 0
76

  
77
    if _random_fn is None:
78
      self._random_fn = random.random
79
    else:
80
      self._random_fn = _random_fn
81

  
82
  def NextAttempt(self):
83
    """Advances to the next attempt.
84

  
85
    """
86
    assert self._attempts >= 0
87
    self._attempts += 1
88

  
89
  def CalcRemainingTimeout(self):
90
    """Returns the remaining timeout.
91

  
92
    """
93
    assert self._attempts >= 0
94

  
95
    if self._attempts == self._MAX_ATTEMPTS:
96
      # Only blocking acquires after 10 retries
97
      return None
98

  
99
    if self._attempts > self._MAX_ATTEMPTS:
100
      raise RuntimeError("Blocking acquire ran into timeout")
101

  
102
    # Get start time on first calculation
103
    if self._start_time is None:
104
      self._start_time = time.time()
105

  
106
    # Calculate remaining time for this attempt
107
    timeout = (self._start_time + (self._ATTEMPT_FACTOR ** self._attempts) -
108
               time.time())
109

  
110
    if timeout > 10.0:
111
      # Cap timeout at 10 seconds. This gives other jobs a chance to run
112
      # even if we're still trying to get our locks, before finally moving
113
      # to a blocking acquire.
114
      timeout = 10.0
115

  
116
    elif timeout < 0.1:
117
      # Lower boundary
118
      timeout = 0.1
119

  
120
    # Add a small variation (-/+ 5%) to timeouts. This helps in situations
121
    # where two or more jobs are fighting for the same lock(s).
122
    variation_range = timeout * 0.1
123
    timeout += (self._random_fn() * variation_range) - (variation_range * 0.5)
124

  
125
    assert timeout >= 0.0, "Timeout must be positive"
126

  
127
    return timeout
128

  
129

  
42 130
class OpExecCbBase:
43 131
  """Base class for OpCode execution callbacks.
44 132

  
......
206 294

  
207 295
    return result
208 296

  
209
  def _LockAndExecLU(self, lu, level):
297
  def _LockAndExecLU(self, lu, level, calc_timeout):
210 298
    """Execute a Logical Unit, with the needed locks.
211 299

  
212 300
    This is a recursive function that starts locking the given level, and
......
221 309
        self._cbs.NotifyStart()
222 310

  
223 311
      result = self._ExecLU(lu)
312

  
224 313
    elif adding_locks and acquiring_locks:
225 314
      # We could both acquire and add locks at the same level, but for now we
226 315
      # don't need this, so we'll avoid the complicated code needed.
227
      raise NotImplementedError(
228
        "Can't declare locks to acquire when adding others")
316
      raise NotImplementedError("Can't declare locks to acquire when adding"
317
                                " others")
318

  
229 319
    elif adding_locks or acquiring_locks:
230 320
      lu.DeclareLocks(level)
231 321
      share = lu.share_locks[level]
232
      if acquiring_locks:
233
        needed_locks = lu.needed_locks[level]
234

  
235
        self._ReportLocks(level, needed_locks, share, False)
236
        lu.acquired_locks[level] = self.context.glm.acquire(level,
237
                                                            needed_locks,
238
                                                            shared=share)
239
        self._ReportLocks(level, needed_locks, share, True)
240

  
241
      else: # adding_locks
242
        add_locks = lu.add_locks[level]
243
        lu.remove_locks[level] = add_locks
244
        try:
245
          self.context.glm.add(level, add_locks, acquired=1, shared=share)
246
        except errors.LockError:
247
          raise errors.OpPrereqError(
248
            "Couldn't add locks (%s), probably because of a race condition"
249
            " with another job, who added them first" % add_locks)
322

  
250 323
      try:
324
        assert adding_locks ^ acquiring_locks, \
325
          "Locks must be either added or acquired"
326

  
327
        if acquiring_locks:
328
          # Acquiring locks
329
          needed_locks = lu.needed_locks[level]
330

  
331
          self._ReportLocks(level, needed_locks, share, False)
332
          acquired = self.context.glm.acquire(level,
333
                                              needed_locks,
334
                                              shared=share,
335
                                              timeout=calc_timeout())
336
          # TODO: Report timeout
337
          self._ReportLocks(level, needed_locks, share, True)
338

  
339
          if acquired is None:
340
            raise _LockAcquireTimeout()
341

  
342
          lu.acquired_locks[level] = acquired
343

  
344
        else:
345
          # Adding locks
346
          add_locks = lu.add_locks[level]
347
          lu.remove_locks[level] = add_locks
348

  
349
          try:
350
            self.context.glm.add(level, add_locks, acquired=1, shared=share)
351
          except errors.LockError:
352
            raise errors.OpPrereqError(
353
              "Couldn't add locks (%s), probably because of a race condition"
354
              " with another job, who added them first" % add_locks)
355

  
356
          lu.acquired_locks[level] = add_locks
251 357
        try:
252
          if adding_locks:
253
            lu.acquired_locks[level] = add_locks
254
          result = self._LockAndExecLU(lu, level + 1)
358
          result = self._LockAndExecLU(lu, level + 1, calc_timeout)
255 359
        finally:
256 360
          if level in lu.remove_locks:
257 361
            self.context.glm.remove(level, lu.remove_locks[level])
258 362
      finally:
259 363
        if self.context.glm.is_owned(level):
260 364
          self.context.glm.release(level)
365

  
261 366
    else:
262
      result = self._LockAndExecLU(lu, level + 1)
367
      result = self._LockAndExecLU(lu, level + 1, calc_timeout)
263 368

  
264 369
    return result
265 370

  
......
282 387
      if lu_class is None:
283 388
        raise errors.OpCodeUnknown("Unknown opcode")
284 389

  
285
      # Acquire the Big Ganeti Lock exclusively if this LU requires it, and in a
286
      # shared fashion otherwise (to prevent concurrent run with an exclusive
287
      # LU.
288
      self._ReportLocks(locking.LEVEL_CLUSTER, [locking.BGL],
289
                        not lu_class.REQ_BGL, False)
290
      try:
291
        self.context.glm.acquire(locking.LEVEL_CLUSTER, [locking.BGL],
292
                                 shared=not lu_class.REQ_BGL)
293
      finally:
294
        self._ReportLocks(locking.LEVEL_CLUSTER, [locking.BGL],
295
                          not lu_class.REQ_BGL, True)
296
      try:
297
        lu = lu_class(self, op, self.context, self.rpc)
298
        lu.ExpandNames()
299
        assert lu.needed_locks is not None, "needed_locks not set by LU"
300
        result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE)
301
      finally:
302
        self.context.glm.release(locking.LEVEL_CLUSTER)
390
      timeout_strategy = _LockTimeoutStrategy()
391
      calc_timeout = timeout_strategy.CalcRemainingTimeout
392

  
393
      while True:
394
        try:
395
          self._ReportLocks(locking.LEVEL_CLUSTER, [locking.BGL],
396
                            not lu_class.REQ_BGL, False)
397
          try:
398
            # Acquire the Big Ganeti Lock exclusively if this LU requires it,
399
            # and in a shared fashion otherwise (to prevent concurrent run with
400
            # an exclusive LU.
401
            acquired_bgl = self.context.glm.acquire(locking.LEVEL_CLUSTER,
402
                                                    [locking.BGL],
403
                                                    shared=not lu_class.REQ_BGL,
404
                                                    timeout=calc_timeout())
405
          finally:
406
            # TODO: Report timeout
407
            self._ReportLocks(locking.LEVEL_CLUSTER, [locking.BGL],
408
                              not lu_class.REQ_BGL, True)
409

  
410
          if acquired_bgl is None:
411
            raise _LockAcquireTimeout()
412

  
413
          try:
414
            lu = lu_class(self, op, self.context, self.rpc)
415
            lu.ExpandNames()
416
            assert lu.needed_locks is not None, "needed_locks not set by LU"
417

  
418
            return self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, calc_timeout)
419
          finally:
420
            self.context.glm.release(locking.LEVEL_CLUSTER)
421

  
422
        except _LockAcquireTimeout:
423
          # Timeout while waiting for lock, try again
424
          pass
425

  
426
        timeout_strategy.NextAttempt()
427

  
303 428
    finally:
304 429
      self._cbs = None
305 430

  
306
    return result
307

  
308 431
  def _Feedback(self, *args):
309 432
    """Forward call to feedback callback function.
310 433

  
b/test/ganeti.mcpu_unittest.py
1
#!/usr/bin/python
2
#
3

  
4
# Copyright (C) 2009 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

  
21

  
22
"""Script for unittesting the mcpu module"""
23

  
24

  
25
import unittest
26

  
27
from ganeti import mcpu
28

  
29

  
30
class TestLockTimeoutStrategy(unittest.TestCase):
31
  def testConstants(self):
32
    self.assert_(mcpu._LockTimeoutStrategy._MAX_ATTEMPTS > 0)
33
    self.assert_(mcpu._LockTimeoutStrategy._ATTEMPT_FACTOR > 1.0)
34

  
35
  def testSimple(self):
36
    strat = mcpu._LockTimeoutStrategy(_random_fn=lambda: 0.5)
37

  
38
    self.assertEqual(strat._attempts, 0)
39

  
40
    prev = None
41
    for _ in range(strat._MAX_ATTEMPTS):
42
      timeout = strat.CalcRemainingTimeout()
43
      self.assert_(timeout is not None)
44

  
45
      self.assert_(timeout <= 10.0)
46
      self.assert_(prev is None or timeout >= prev)
47

  
48
      strat.NextAttempt()
49

  
50
      prev = timeout
51

  
52
    self.assert_(strat.CalcRemainingTimeout() is None)
53

  
54

  
55
if __name__ == "__main__":
56
  unittest.main()

Also available in: Unified diff