Revision d76167a5

b/lib/locking.py
20 20

  
21 21
"""Module implementing the Ganeti locking code."""
22 22

  
23
import os
24
import select
23 25
import threading
26
import time
27
import errno
24 28

  
25 29
from ganeti import errors
26 30
from ganeti import utils
......
45 49
  return wrap
46 50

  
47 51

  
52
class _SingleActionPipeConditionWaiter(object):
53
  """Callable helper class for _SingleActionPipeCondition.
54

  
55
  """
56
  __slots__ = [
57
    "_cond",
58
    "_fd",
59
    "_poller",
60
    ]
61

  
62
  def __init__(self, cond, poller, fd):
63
    """Initializes this class.
64

  
65
    @type cond: L{_SingleActionPipeCondition}
66
    @param cond: Parent condition
67
    @type poller: select.poll
68
    @param poller: Poller object
69
    @type fd: int
70
    @param fd: File descriptor to wait for
71

  
72
    """
73
    object.__init__(self)
74

  
75
    self._cond = cond
76
    self._poller = poller
77
    self._fd = fd
78

  
79
  def __call__(self, timeout):
80
    """Wait for something to happen on the pipe.
81

  
82
    @type timeout: float or None
83
    @param timeout: Timeout for waiting (can be None)
84

  
85
    """
86
    start_time = time.time()
87
    remaining_time = timeout
88

  
89
    while timeout is None or remaining_time > 0:
90
      try:
91
        result = self._poller.poll(remaining_time)
92
      except EnvironmentError, err:
93
        if err.errno != errno.EINTR:
94
          raise
95
        result = None
96

  
97
      # Check whether we were notified
98
      if result and result[0][0] == self._fd:
99
        break
100

  
101
      # Re-calculate timeout if necessary
102
      if timeout is not None:
103
        remaining_time = start_time + timeout - time.time()
104

  
105

  
106
class _SingleActionPipeCondition(object):
107
  """Wrapper around a pipe for usage inside conditions.
108

  
109
  This class contains a POSIX pipe(2) and a poller to poll it. The pipe is
110
  always allocated when constructing the class. Extra care is taken to always
111
  close the file descriptors.
112

  
113
  An additional class, L{_SingleActionPipeConditionWaiter}, is used to wait for
114
  notifications.
115

  
116
  Warning: This class is designed to be used as the underlying component of a
117
  locking condition, but is not by itself thread safe, and needs to be
118
  protected by an external lock.
119

  
120
  """
121
  __slots__ = [
122
    "_poller",
123
    "_read_fd",
124
    "_write_fd",
125
    "_nwaiters",
126
    ]
127

  
128
  _waiter_class = _SingleActionPipeConditionWaiter
129

  
130
  def __init__(self):
131
    """Initializes this class.
132

  
133
    """
134
    object.__init__(self)
135

  
136
    self._nwaiters = 0
137

  
138
    # Just assume the unpacking is successful, otherwise error handling gets
139
    # very complicated.
140
    (self._read_fd, self._write_fd) = os.pipe()
141
    try:
142
      # The poller looks for closure of the write side
143
      poller = select.poll()
144
      poller.register(self._read_fd, select.POLLHUP)
145

  
146
      self._poller = poller
147
    except:
148
      if self._read_fd is not None:
149
        os.close(self._read_fd)
150
      if self._write_fd is not None:
151
        os.close(self._write_fd)
152
      raise
153

  
154
    # There should be no code here anymore, otherwise the pipe file descriptors
155
    # may be not be cleaned up properly in case of errors.
156

  
157
  def StartWaiting(self):
158
    """Return function to wait for notification.
159

  
160
    @rtype: L{_SingleActionPipeConditionWaiter}
161
    @return: Function to wait for notification
162

  
163
    """
164
    assert self._nwaiters >= 0
165

  
166
    if self._poller is None:
167
      raise RuntimeError("Already cleaned up")
168

  
169
    # Create waiter function and increase number of waiters
170
    wait_fn = self._waiter_class(self, self._poller, self._read_fd)
171
    self._nwaiters += 1
172
    return wait_fn
173

  
174
  def DoneWaiting(self):
175
    """Decrement number of waiters and automatic cleanup.
176

  
177
    Must be called after waiting for a notification.
178

  
179
    @rtype: bool
180
    @return: Whether this was the last waiter
181

  
182
    """
183
    assert self._nwaiters > 0
184

  
185
    self._nwaiters -= 1
186

  
187
    if self._nwaiters == 0:
188
      self._Cleanup()
189
      return True
190

  
191
    return False
192

  
193
  def notifyAll(self):
194
    """Close the writing side of the pipe to notify all waiters.
195

  
196
    """
197
    if self._write_fd is None:
198
      raise RuntimeError("Can only notify once")
199

  
200
    os.close(self._write_fd)
201
    self._write_fd = None
202

  
203
  def _Cleanup(self):
204
    """Close all file descriptors.
205

  
206
    """
207
    if self._read_fd is not None:
208
      os.close(self._read_fd)
209
      self._read_fd = None
210

  
211
    if self._write_fd is not None:
212
      os.close(self._write_fd)
213
      self._write_fd = None
214

  
215
    self._poller = None
216

  
217
  def __del__(self):
218
    """Called on object deletion.
219

  
220
    Ensure no file descriptors are left open.
221

  
222
    """
223
    self._Cleanup()
224

  
225

  
48 226
class _CountingCondition(object):
49 227
  """Wrapper for Python's built-in threading.Condition class.
50 228

  
b/test/ganeti.locking_unittest.py
69 69
    self.threads = []
70 70

  
71 71

  
72
class TestSingleActionPipeCondition(unittest.TestCase):
73
  """_SingleActionPipeCondition tests"""
74

  
75
  def setUp(self):
76
    self.cond = locking._SingleActionPipeCondition()
77

  
78
  def testInitialization(self):
79
    self.assert_(self.cond._read_fd is not None)
80
    self.assert_(self.cond._write_fd is not None)
81
    self.assert_(self.cond._poller is not None)
82
    self.assertEqual(self.cond._nwaiters, 0)
83

  
84
  def testUsageCount(self):
85
    self.cond.StartWaiting()
86
    self.assert_(self.cond._read_fd is not None)
87
    self.assert_(self.cond._write_fd is not None)
88
    self.assert_(self.cond._poller is not None)
89
    self.assertEqual(self.cond._nwaiters, 1)
90

  
91
    # use again
92
    self.cond.StartWaiting()
93
    self.assertEqual(self.cond._nwaiters, 2)
94

  
95
    # there is more than one user
96
    self.assert_(not self.cond.DoneWaiting())
97
    self.assert_(self.cond._read_fd is not None)
98
    self.assert_(self.cond._write_fd is not None)
99
    self.assert_(self.cond._poller is not None)
100
    self.assertEqual(self.cond._nwaiters, 1)
101

  
102
    self.assert_(self.cond.DoneWaiting())
103
    self.assertEqual(self.cond._nwaiters, 0)
104
    self.assert_(self.cond._read_fd is None)
105
    self.assert_(self.cond._write_fd is None)
106
    self.assert_(self.cond._poller is None)
107

  
108
  def testNotify(self):
109
    wait1 = self.cond.StartWaiting()
110
    wait2 = self.cond.StartWaiting()
111

  
112
    self.assert_(self.cond._read_fd is not None)
113
    self.assert_(self.cond._write_fd is not None)
114
    self.assert_(self.cond._poller is not None)
115

  
116
    self.cond.notifyAll()
117

  
118
    self.assert_(self.cond._read_fd is not None)
119
    self.assert_(self.cond._write_fd is None)
120
    self.assert_(self.cond._poller is not None)
121

  
122
    self.assert_(not self.cond.DoneWaiting())
123

  
124
    self.assert_(self.cond._read_fd is not None)
125
    self.assert_(self.cond._write_fd is None)
126
    self.assert_(self.cond._poller is not None)
127

  
128
    self.assert_(self.cond.DoneWaiting())
129

  
130
    self.assert_(self.cond._read_fd is None)
131
    self.assert_(self.cond._write_fd is None)
132
    self.assert_(self.cond._poller is None)
133

  
134
  def testReusage(self):
135
    self.cond.StartWaiting()
136
    self.assert_(self.cond._read_fd is not None)
137
    self.assert_(self.cond._write_fd is not None)
138
    self.assert_(self.cond._poller is not None)
139

  
140
    self.assert_(self.cond.DoneWaiting())
141

  
142
    self.assertRaises(RuntimeError, self.cond.StartWaiting)
143
    self.assert_(self.cond._read_fd is None)
144
    self.assert_(self.cond._write_fd is None)
145
    self.assert_(self.cond._poller is None)
146

  
147
  def testNotifyTwice(self):
148
    self.cond.notifyAll()
149
    self.assertRaises(RuntimeError, self.cond.notifyAll)
150

  
151

  
72 152
class TestSharedLock(_ThreadedTestCase):
73 153
  """SharedLock tests"""
74 154

  

Also available in: Unified diff