Revision 84e344d4 lib/locking.py

b/lib/locking.py
20 20

  
21 21
"""Module implementing the Ganeti locking code."""
22 22

  
23
# pylint: disable-msg=W0613,W0201
24

  
25 23
import threading
26
# Wouldn't it be better to define LockingError in the locking module?
27
# Well, for now that's how the rest of the code does it...
24

  
28 25
from ganeti import errors
29 26
from ganeti import utils
30 27

  
......
48 45
  return wrap
49 46

  
50 47

  
51
class SharedLock:
48
class _CountingCondition(object):
49
  """Wrapper for Python's built-in threading.Condition class.
50

  
51
  This wrapper keeps a count of active waiters. We can't access the internal
52
  "__waiters" attribute of threading.Condition because it's not thread-safe.
53

  
54
  """
55
  __slots__ = [
56
    "_cond",
57
    "_nwaiters",
58
    ]
59

  
60
  def __init__(self, lock):
61
    """Initializes this class.
62

  
63
    """
64
    object.__init__(self)
65
    self._cond = threading.Condition(lock=lock)
66
    self._nwaiters = 0
67

  
68
  def notifyAll(self):
69
    """Notifies the condition.
70

  
71
    """
72
    return self._cond.notifyAll()
73

  
74
  def wait(self, timeout=None):
75
    """Waits for the condition to be notified.
76

  
77
    @type timeout: float or None
78
    @param timeout: Timeout in seconds
79

  
80
    """
81
    assert self._nwaiters >= 0
82

  
83
    self._nwaiters += 1
84
    try:
85
      return self._cond.wait(timeout=timeout)
86
    finally:
87
      self._nwaiters -= 1
88

  
89
  def has_waiting(self):
90
    """Returns whether there are active waiters.
91

  
92
    """
93
    return bool(self._nwaiters)
94

  
95

  
96
class SharedLock(object):
52 97
  """Implements a shared lock.
53 98

  
54 99
  Multiple threads can acquire the lock in a shared way, calling
......
60 105
  eventually do so.
61 106

  
62 107
  """
108
  __slots__ = [
109
    "__active_shr_c",
110
    "__inactive_shr_c",
111
    "__deleted",
112
    "__exc",
113
    "__lock",
114
    "__pending",
115
    "__shr",
116
    ]
117

  
118
  __condition_class = _CountingCondition
119

  
63 120
  def __init__(self):
64
    """Construct a new SharedLock"""
65
    # we have two conditions, c_shr and c_exc, sharing the same lock.
121
    """Construct a new SharedLock.
122

  
123
    """
124
    object.__init__(self)
125

  
126
    # Internal lock
66 127
    self.__lock = threading.Lock()
67
    self.__turn_shr = threading.Condition(self.__lock)
68
    self.__turn_exc = threading.Condition(self.__lock)
69 128

  
70
    # current lock holders
129
    # Queue containing waiting acquires
130
    self.__pending = []
131

  
132
    # Active and inactive conditions for shared locks
133
    self.__active_shr_c = self.__condition_class(self.__lock)
134
    self.__inactive_shr_c = self.__condition_class(self.__lock)
135

  
136
    # Current lock holders
71 137
    self.__shr = set()
72 138
    self.__exc = None
73 139

  
74
    # lock waiters
75
    self.__nwait_exc = 0
76
    self.__nwait_shr = 0
77
    self.__npass_shr = 0
78

  
79 140
    # is this lock in the deleted state?
80 141
    self.__deleted = False
81 142

  
143
  def __check_deleted(self):
144
    """Raises an exception if the lock has been deleted.
145

  
146
    """
147
    if self.__deleted:
148
      raise errors.LockError("Deleted lock")
149

  
82 150
  def __is_sharer(self):
83
    """Is the current thread sharing the lock at this time?"""
151
    """Is the current thread sharing the lock at this time?
152

  
153
    """
84 154
    return threading.currentThread() in self.__shr
85 155

  
86 156
  def __is_exclusive(self):
87
    """Is the current thread holding the lock exclusively at this time?"""
157
    """Is the current thread holding the lock exclusively at this time?
158

  
159
    """
88 160
    return threading.currentThread() == self.__exc
89 161

  
90 162
  def __is_owned(self, shared=-1):
......
112 184
    """
113 185
    self.__lock.acquire()
114 186
    try:
115
      result = self.__is_owned(shared=shared)
187
      return self.__is_owned(shared=shared)
116 188
    finally:
117 189
      self.__lock.release()
118 190

  
119
    return result
120

  
121
  def __wait(self, c):
122
    """Wait on the given condition, and raise an exception if the current lock
123
    is declared deleted in the meantime.
191
  def _count_pending(self):
192
    """Returns the number of pending acquires.
124 193

  
125
    @param c: the condition to wait on
194
    @rtype: int
126 195

  
127 196
    """
128
    c.wait()
129
    if self.__deleted:
130
      raise errors.LockError('deleted lock')
197
    self.__lock.acquire()
198
    try:
199
      return len(self.__pending)
200
    finally:
201
      self.__lock.release()
131 202

  
132
  def __exclusive_acquire(self):
133
    """Acquire the lock exclusively.
203
  def __do_acquire(self, shared):
204
    """Actually acquire the lock.
134 205

  
135
    This is a private function that presumes you are already holding the
136
    internal lock. It's defined separately to avoid code duplication between
137
    acquire() and delete()
206
    """
207
    if shared:
208
      self.__shr.add(threading.currentThread())
209
    else:
210
      self.__exc = threading.currentThread()
211

  
212
  def __can_acquire(self, shared):
213
    """Determine whether lock can be acquired.
138 214

  
139 215
    """
140
    self.__nwait_exc += 1
141
    try:
142
      # This is to save ourselves from a nasty race condition that could
143
      # theoretically make the sharers starve.
144
      if self.__nwait_shr > 0 or self.__nwait_exc > 1:
145
        self.__wait(self.__turn_exc)
216
    if shared:
217
      return self.__exc is None
218
    else:
219
      return len(self.__shr) == 0 and self.__exc is None
146 220

  
147
      while len(self.__shr) > 0 or self.__exc is not None:
148
        self.__wait(self.__turn_exc)
221
  def __is_on_top(self, cond):
222
    """Checks whether the passed condition is on top of the queue.
149 223

  
150
      self.__exc = threading.currentThread()
151
    finally:
152
      self.__nwait_exc -= 1
224
    The caller must make sure the queue isn't empty.
153 225

  
154
    assert self.__npass_shr == 0, "SharedLock: internal fairness violation"
226
    """
227
    return self.__pending[0] == cond
155 228

  
156
  def __shared_acquire(self):
157
    """Acquire the lock in shared mode
229
  def __acquire_unlocked(self, shared=0, timeout=None):
230
    """Acquire a shared lock.
158 231

  
159
    This is a private function that presumes you are already holding the
160
    internal lock.
232
    @param shared: whether to acquire in shared mode; by default an
233
        exclusive lock will be acquired
234
    @param timeout: maximum waiting time before giving up
161 235

  
162 236
    """
163
    self.__nwait_shr += 1
164
    try:
165
      wait = False
166
      # If there is an exclusive holder waiting we have to wait.
167
      # We'll only do this once, though, when we start waiting for
168
      # the lock. Then we'll just wait while there are no
169
      # exclusive holders.
170
      if self.__nwait_exc > 0:
171
        # TODO: if !blocking...
172
        wait = True
173
        self.__wait(self.__turn_shr)
174

  
175
      while self.__exc is not None:
176
        wait = True
177
        # TODO: if !blocking...
178
        self.__wait(self.__turn_shr)
237
    self.__check_deleted()
179 238

  
180
      self.__shr.add(threading.currentThread())
239
    # We cannot acquire the lock if we already have it
240
    assert not self.__is_owned(), "double acquire() on a non-recursive lock"
241

  
242
    # Check whether someone else holds the lock or there are pending acquires.
243
    if not self.__pending and self.__can_acquire(shared):
244
      # Apparently not, can acquire lock directly.
245
      self.__do_acquire(shared)
246
      return True
181 247

  
182
      # If we were waiting note that we passed
183
      if wait:
184
        self.__npass_shr -= 1
248
    if shared:
249
      wait_condition = self.__active_shr_c
185 250

  
251
      # Check if we're not yet in the queue
252
      if wait_condition not in self.__pending:
253
        self.__pending.append(wait_condition)
254
    else:
255
      wait_condition = self.__condition_class(self.__lock)
256
      # Always add to queue
257
      self.__pending.append(wait_condition)
258

  
259
    try:
260
      # Wait until we become the topmost acquire in the queue or the timeout
261
      # expires.
262
      while not (self.__is_on_top(wait_condition) and
263
                 self.__can_acquire(shared)):
264
        # Wait for notification
265
        wait_condition.wait(timeout)
266
        self.__check_deleted()
267

  
268
        # A lot of code assumes blocking acquires always succeed. Loop
269
        # internally for that case.
270
        if timeout is not None:
271
          break
272

  
273
      if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
274
        self.__do_acquire(shared)
275
        return True
186 276
    finally:
187
      self.__nwait_shr -= 1
277
      # Remove condition from queue if there are no more waiters
278
      if not wait_condition.has_waiting() and not self.__deleted:
279
        self.__pending.remove(wait_condition)
188 280

  
189
    assert self.__npass_shr >= 0, "Internal fairness condition weirdness"
281
    return False
190 282

  
191
  def acquire(self, blocking=1, shared=0):
283
  def acquire(self, shared=0, timeout=None):
192 284
    """Acquire a shared lock.
193 285

  
286
    @type shared: int
194 287
    @param shared: whether to acquire in shared mode; by default an
195 288
        exclusive lock will be acquired
196
    @param blocking: whether to block while trying to acquire or to
197
        operate in try-lock mode (this locking mode is not supported yet)
289
    @type timeout: float
290
    @param timeout: maximum waiting time before giving up
198 291

  
199 292
    """
200
    if not blocking:
201
      # We don't have non-blocking mode for now
202
      raise NotImplementedError
203

  
204 293
    self.__lock.acquire()
205 294
    try:
206
      if self.__deleted:
207
        raise errors.LockError('deleted lock')
208

  
209
      # We cannot acquire the lock if we already have it
210
      assert not self.__is_owned(), "double acquire() on a non-recursive lock"
211
      assert self.__npass_shr >= 0, "Internal fairness condition weirdness"
212

  
213
      if shared:
214
        self.__shared_acquire()
215
      else:
216
        # TODO: if !blocking...
217
        # (or modify __exclusive_acquire for non-blocking mode)
218
        self.__exclusive_acquire()
219

  
295
      return self.__acquire_unlocked(shared, timeout)
220 296
    finally:
221 297
      self.__lock.release()
222 298

  
223
    return True
224

  
225 299
  def release(self):
226 300
    """Release a Shared Lock.
227 301

  
......
231 305
    """
232 306
    self.__lock.acquire()
233 307
    try:
234
      assert self.__npass_shr >= 0, "Internal fairness condition weirdness"
308
      assert self.__is_exclusive() or self.__is_sharer(), \
309
        "Cannot release non-owned lock"
310

  
235 311
      # Autodetect release type
236 312
      if self.__is_exclusive():
237 313
        self.__exc = None
238

  
239
        # An exclusive holder has just had the lock, time to put it in shared
240
        # mode if there are shared holders waiting. Otherwise wake up the next
241
        # exclusive holder.
242
        if self.__nwait_shr > 0:
243
          # Make sure at least the ones which were blocked pass.
244
          self.__npass_shr = self.__nwait_shr
245
          self.__turn_shr.notifyAll()
246
        elif self.__nwait_exc > 0:
247
          self.__turn_exc.notify()
248

  
249
      elif self.__is_sharer():
314
      else:
250 315
        self.__shr.remove(threading.currentThread())
251 316

  
252
        # If there are shared holders waiting (and not just scheduled to pass)
253
        # there *must* be an exclusive holder waiting as well; otherwise what
254
        # were they waiting for?
255
        assert (self.__nwait_exc > 0 or
256
                self.__npass_shr == self.__nwait_shr), \
257
                "Lock sharers waiting while no exclusive is queueing"
258

  
259
        # If there are no more shared holders either in or scheduled to pass,
260
        # and some exclusive holders are waiting let's wake one up.
261
        if (len(self.__shr) == 0 and
262
            self.__nwait_exc > 0 and
263
            not self.__npass_shr > 0):
264
          self.__turn_exc.notify()
317
      # Notify topmost condition in queue
318
      if self.__pending:
319
        first_condition = self.__pending[0]
320
        first_condition.notifyAll()
265 321

  
266
      else:
267
        assert False, "Cannot release non-owned lock"
322
        if first_condition == self.__active_shr_c:
323
          self.__active_shr_c = self.__inactive_shr_c
324
          self.__inactive_shr_c = first_condition
268 325

  
269 326
    finally:
270 327
      self.__lock.release()
271 328

  
272
  def delete(self, blocking=1):
329
  def delete(self, timeout=None):
273 330
    """Delete a Shared Lock.
274 331

  
275 332
    This operation will declare the lock for removal. First the lock will be
276 333
    acquired in exclusive mode if you don't already own it, then the lock
277 334
    will be put in a state where any future and pending acquire() fail.
278 335

  
279
    @param blocking: whether to block while trying to acquire or to
280
        operate in try-lock mode.  this locking mode is not supported
281
        yet unless you are already holding exclusively the lock.
336
    @type timeout: float
337
    @param timeout: maximum waiting time before giving up
282 338

  
283 339
    """
284 340
    self.__lock.acquire()
285 341
    try:
286
      assert not self.__is_sharer(), "cannot delete() a lock while sharing it"
342
      assert not self.__is_sharer(), "Cannot delete() a lock while sharing it"
343

  
344
      self.__check_deleted()
287 345

  
288
      if self.__deleted:
289
        raise errors.LockError('deleted lock')
346
      # The caller is allowed to hold the lock exclusively already.
347
      acquired = self.__is_exclusive()
290 348

  
291
      if not self.__is_exclusive():
292
        if not blocking:
293
          # We don't have non-blocking mode for now
294
          raise NotImplementedError
295
        self.__exclusive_acquire()
349
      if not acquired:
350
        acquired = self.__acquire_unlocked(timeout)
351

  
352
      if acquired:
353
        self.__deleted = True
354
        self.__exc = None
296 355

  
297
      self.__deleted = True
298
      self.__exc = None
299
      # Wake up everybody, they will fail acquiring the lock and
300
      # raise an exception instead.
301
      self.__turn_exc.notifyAll()
302
      self.__turn_shr.notifyAll()
356
        # Notify all acquires. They'll throw an error.
357
        while self.__pending:
358
          self.__pending.pop().notifyAll()
303 359

  
360
      return acquired
304 361
    finally:
305 362
      self.__lock.release()
306 363

  

Also available in: Unified diff