Revision a95fd5d7

b/lib/locking.py
23 23
# pylint: disable-msg=W0613,W0201
24 24

  
25 25
import threading
26
# Wouldn't it be better to define LockingError in the locking module?
27
# Well, for now that's how the rest of the code does it...
28
from ganeti import errors
26 29

  
27 30

  
28 31
class SharedLock:
......
52 55
    self.__nwait_exc = 0
53 56
    self.__nwait_shr = 0
54 57

  
58
    # is this lock in the deleted state?
59
    self.__deleted = False
60

  
55 61
  def __is_sharer(self):
56 62
    """Is the current thread sharing the lock at this time?"""
57 63
    return threading.currentThread() in self.__shr
......
92 98

  
93 99
    return result
94 100

  
101
  def __wait(self,c):
102
    """Wait on the given condition, and raise an exception if the current lock
103
    is declared deleted in the meantime.
104

  
105
    Args:
106
      c: condition to wait on
107

  
108
    """
109
    c.wait()
110
    if self.__deleted:
111
      raise errors.LockError('deleted lock')
112

  
113
  def __exclusive_acquire(self):
114
    """Acquire the lock exclusively.
115

  
116
    This is a private function that presumes you are already holding the
117
    internal lock. It's defined separately to avoid code duplication between
118
    acquire() and delete()
119

  
120
    """
121
    self.__nwait_exc += 1
122
    try:
123
      # This is to save ourselves from a nasty race condition that could
124
      # theoretically make the sharers starve.
125
      if self.__nwait_shr > 0 or self.__nwait_exc > 1:
126
        self.__wait(self.__turn_exc)
127

  
128
      while len(self.__shr) > 0 or self.__exc is not None:
129
        self.__wait(self.__turn_exc)
130

  
131
      self.__exc = threading.currentThread()
132
    finally:
133
      self.__nwait_exc -= 1
134

  
135

  
95 136
  def acquire(self, blocking=1, shared=0):
96 137
    """Acquire a shared lock.
97 138

  
......
108 149

  
109 150
    self.__lock.acquire()
110 151
    try:
152
      if self.__deleted:
153
        raise errors.LockError('deleted lock')
154

  
111 155
      # We cannot acquire the lock if we already have it
112 156
      assert not self.__is_owned(), "double acquire() on a non-recursive lock"
113 157

  
......
119 163
          # we'll just wait while there are no exclusive holders.
120 164
          if self.__nwait_exc > 0:
121 165
            # TODO: if !blocking...
122
            self.__turn_shr.wait()
166
            self.__wait(self.__turn_shr)
123 167

  
124 168
          while self.__exc is not None:
125 169
            # TODO: if !blocking...
126
            self.__turn_shr.wait()
170
            self.__wait(self.__turn_shr)
127 171

  
128 172
          self.__shr.add(threading.currentThread())
129 173
        finally:
130 174
          self.__nwait_shr -= 1
131 175

  
132 176
      else:
133
        self.__nwait_exc += 1
134
        try:
135
          # This is to save ourselves from a nasty race condition that could
136
          # theoretically make the sharers starve.
137
          if self.__nwait_shr > 0 or self.__nwait_exc > 1:
138
            # TODO: if !blocking...
139
              self.__turn_exc.wait()
140

  
141
          while len(self.__shr) > 0 or self.__exc is not None:
142
            # TODO: if !blocking...
143
            self.__turn_exc.wait()
144

  
145
          self.__exc = threading.currentThread()
146
        finally:
147
          self.__nwait_exc -= 1
177
        # TODO: if !blocking...
178
        # (or modify __exclusive_acquire for non-blocking mode)
179
        self.__exclusive_acquire()
148 180

  
149 181
    finally:
150 182
      self.__lock.release()
......
191 223
    finally:
192 224
      self.__lock.release()
193 225

  
226
  def delete(self, blocking=1):
227
    """Delete a Shared Lock.
228

  
229
    This operation will declare the lock for removal. First the lock will be
230
    acquired in exclusive mode if you don't already own it, then the lock
231
    will be put in a state where any future and pending acquire() fail.
232

  
233
    Args:
234
      blocking: whether to block while trying to acquire or to operate in
235
                try-lock mode.  this locking mode is not supported yet unless
236
                you are already holding exclusively the lock.
237

  
238
    """
239
    self.__lock.acquire()
240
    try:
241
      assert not self.__is_sharer(), "cannot delete() a lock while sharing it"
242

  
243
      if self.__deleted:
244
        raise errors.LockError('deleted lock')
245

  
246
      if not self.__is_exclusive():
247
        if not blocking:
248
          # We don't have non-blocking mode for now
249
          raise NotImplementedError
250
        self.__exclusive_acquire()
251

  
252
      self.__deleted = True
253
      self.__exc = None
254
      # Wake up everybody, they will fail acquiring the lock and
255
      # raise an exception instead.
256
      self.__turn_exc.notifyAll()
257
      self.__turn_shr.notifyAll()
258

  
259
    finally:
260
      self.__lock.release()
261

  
b/test/ganeti.locking_unittest.py
28 28
import Queue
29 29

  
30 30
from ganeti import locking
31
from ganeti import errors
31 32
from threading import Thread
32 33

  
33 34

  
......
86 87
  # helper functions: called in a separate thread they acquire the lock, send
87 88
  # their identifier on the done queue, then release it.
88 89
  def _doItSharer(self):
89
    self.sl.acquire(shared=1)
90
    self.done.put('SHR')
91
    self.sl.release()
90
    try:
91
      self.sl.acquire(shared=1)
92
      self.done.put('SHR')
93
      self.sl.release()
94
    except errors.LockError:
95
      self.done.put('ERR')
92 96

  
93 97
  def _doItExclusive(self):
94
    self.sl.acquire()
95
    self.done.put('EXC')
96
    self.sl.release()
98
    try:
99
      self.sl.acquire()
100
      self.done.put('EXC')
101
      self.sl.release()
102
    except errors.LockError:
103
      self.done.put('ERR')
104

  
105
  def _doItDelete(self):
106
    try:
107
      self.sl.acquire()
108
      self.done.put('DEL')
109
      self.sl.release()
110
    except errors.LockError:
111
      self.done.put('ERR')
97 112

  
98 113
  def testSharersCanCoexist(self):
99 114
    self.sl.acquire(shared=1)
......
109 124
    self.sl.release()
110 125
    self.assert_(self.done.get(True, 1))
111 126

  
127
  def testExclusiveBlocksDelete(self):
128
    self.sl.acquire()
129
    Thread(target=self._doItDelete).start()
130
    # give it a bit of time to check that it's not actually doing anything
131
    self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
132
    self.sl.release()
133
    self.assert_(self.done.get(True, 1))
134

  
112 135
  def testExclusiveBlocksSharer(self):
113 136
    self.sl.acquire()
114 137
    Thread(target=self._doItSharer).start()
......
125 148
    self.sl.release()
126 149
    self.assert_(self.done.get(True, 1))
127 150

  
151
  def testSharerBlocksDelete(self):
152
    self.sl.acquire(shared=1)
153
    Thread(target=self._doItDelete).start()
154
    time.sleep(0.05)
155
    self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
156
    self.sl.release()
157
    self.assert_(self.done.get(True, 1))
158

  
128 159
  def testWaitingExclusiveBlocksSharer(self):
129 160
    self.sl.acquire(shared=1)
130 161
    # the lock is acquired in shared mode...
......
153 184
    self.assertEqual(self.done.get(True, 1), 'SHR')
154 185
    self.assertEqual(self.done.get(True, 1), 'EXC')
155 186

  
187
  def testNoNonBlocking(self):
188
    self.assertRaises(NotImplementedError, self.sl.acquire, blocking=0)
189
    self.assertRaises(NotImplementedError, self.sl.delete, blocking=0)
190
    self.sl.acquire()
191
    self.sl.delete(blocking=0) # Fine, because the lock is already acquired
192

  
193
  def testDelete(self):
194
    self.sl.delete()
195
    self.assertRaises(errors.LockError, self.sl.acquire)
196
    self.assertRaises(errors.LockError, self.sl.delete)
197

  
198
  def testDeletePendingSharersExclusiveDelete(self):
199
    self.sl.acquire()
200
    Thread(target=self._doItSharer).start()
201
    Thread(target=self._doItSharer).start()
202
    time.sleep(0.05)
203
    Thread(target=self._doItExclusive).start()
204
    Thread(target=self._doItDelete).start()
205
    time.sleep(0.05)
206
    self.sl.delete()
207
    # The two threads who were pending return both ERR
208
    self.assertEqual(self.done.get(True, 1), 'ERR')
209
    self.assertEqual(self.done.get(True, 1), 'ERR')
210
    self.assertEqual(self.done.get(True, 1), 'ERR')
211
    self.assertEqual(self.done.get(True, 1), 'ERR')
212

  
213
  def testDeletePendingDeleteExclusiveSharers(self):
214
    self.sl.acquire()
215
    Thread(target=self._doItDelete).start()
216
    Thread(target=self._doItExclusive).start()
217
    time.sleep(0.05)
218
    Thread(target=self._doItSharer).start()
219
    Thread(target=self._doItSharer).start()
220
    time.sleep(0.05)
221
    self.sl.delete()
222
    # The two threads who were pending return both ERR
223
    self.assertEqual(self.done.get(True, 1), 'ERR')
224
    self.assertEqual(self.done.get(True, 1), 'ERR')
225
    self.assertEqual(self.done.get(True, 1), 'ERR')
226
    self.assertEqual(self.done.get(True, 1), 'ERR')
227

  
156 228

  
157 229
if __name__ == '__main__':
158 230
  unittest.main()

Also available in: Unified diff