Statistics
| Branch: | Tag: | Revision:

root / lib / daemon.py @ 59b4eeef

History | View | Annotate | Download (8.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module with helper classes and functions for daemons"""
23

    
24

    
25
import select
26
import signal
27
import errno
28
import time
29

    
30
from ganeti import utils
31

    
32

    
33
class Timer(object):
34
  def __init__(self, owner, timer_id, start, interval, repeat):
35
    self.owner = owner
36
    self.timer_id = timer_id
37
    self.start = start
38
    self.interval = interval
39
    self.repeat = repeat
40

    
41

    
42
class Mainloop(object):
43
  """Generic mainloop for daemons
44

45
  """
46
  def __init__(self):
47
    """Constructs a new Mainloop instance.
48

49
    """
50
    self._io_wait = {}
51
    self._io_wait_add = []
52
    self._io_wait_remove = []
53
    self._signal_wait = []
54
    self._timer_id_last = 0
55
    self._timer = {}
56
    self._timer_add = []
57
    self._timer_remove = []
58

    
59
  def Run(self, handle_sigchld=True, handle_sigterm=True, stop_on_empty=False):
60
    """Runs the mainloop.
61

62
    @type handle_sigchld: bool
63
    @param handle_sigchld: Whether to install handler for SIGCHLD
64
    @type handle_sigterm: bool
65
    @param handle_sigterm: Whether to install handler for SIGTERM
66
    @type stop_on_empty: bool
67
    @param stop_on_empty: Whether to stop mainloop once all I/O waiters
68
                          unregistered
69

70
    """
71
    poller = select.poll()
72

    
73
    # Setup signal handlers
74
    if handle_sigchld:
75
      sigchld_handler = utils.SignalHandler([signal.SIGCHLD])
76
    else:
77
      sigchld_handler = None
78
    try:
79
      if handle_sigterm:
80
        sigterm_handler = utils.SignalHandler([signal.SIGTERM])
81
      else:
82
        sigterm_handler = None
83

    
84
      try:
85
        running = True
86
        timeout = None
87
        timeout_needs_update = True
88

    
89
        # Start actual main loop
90
        while running:
91
          # Entries could be added again afterwards, hence removing first
92
          if self._io_wait_remove:
93
            for fd in self._io_wait_remove:
94
              try:
95
                poller.unregister(fd)
96
              except KeyError:
97
                pass
98
              try:
99
                del self._io_wait[fd]
100
              except KeyError:
101
                pass
102
            self._io_wait_remove = []
103

    
104
          # Add new entries
105
          if self._io_wait_add:
106
            for (owner, fd, conditions) in self._io_wait_add:
107
              self._io_wait[fd] = owner
108
              poller.register(fd, conditions)
109
            self._io_wait_add = []
110

    
111
          # Add new timers
112
          if self._timer_add:
113
            timeout_needs_update = True
114
            for timer in self._timer_add:
115
              self._timer[timer.timer_id] = timer
116
            del self._timer_add[:]
117

    
118
          # Remove timers
119
          if self._timer_remove:
120
            timeout_needs_update = True
121
            for timer_id in self._timer_remove:
122
              try:
123
                del self._timer[timer_id]
124
              except KeyError:
125
                pass
126
            del self._timer_remove[:]
127

    
128
          # Stop if nothing is listening anymore
129
          if stop_on_empty and not (self._io_wait or self._timer):
130
            break
131

    
132
          # Calculate timeout again if required
133
          if timeout_needs_update:
134
            timeout = self._CalcTimeout(time.time())
135
            timeout_needs_update = False
136

    
137
          # Wait for I/O events
138
          try:
139
            io_events = poller.poll(timeout)
140
          except select.error, err:
141
            # EINTR can happen when signals are sent
142
            if err.args and err.args[0] in (errno.EINTR,):
143
              io_events = None
144
            else:
145
              raise
146

    
147
          after_poll = time.time()
148

    
149
          if io_events:
150
            # Check for I/O events
151
            for (evfd, evcond) in io_events:
152
              owner = self._io_wait.get(evfd, None)
153
              if owner:
154
                owner.OnIO(evfd, evcond)
155

    
156
          if self._timer:
157
            self._CheckTimers(after_poll)
158

    
159
          # Check whether signal was raised
160
          if sigchld_handler and sigchld_handler.called:
161
            self._CallSignalWaiters(signal.SIGCHLD)
162
            sigchld_handler.Clear()
163

    
164
          if sigterm_handler and sigterm_handler.called:
165
            self._CallSignalWaiters(signal.SIGTERM)
166
            running = False
167
            sigterm_handler.Clear()
168
      finally:
169
        # Restore signal handlers
170
        if sigterm_handler:
171
          sigterm_handler.Reset()
172
    finally:
173
      if sigchld_handler:
174
        sigchld_handler.Reset()
175

    
176
  def _CalcTimeout(self, now):
177
    if not self._timer:
178
      return None
179

    
180
    timeout = None
181

    
182
    # TODO: Repeating timers
183

    
184
    min_timeout = 0.001
185

    
186
    for timer in self._timer.itervalues():
187
      time_left = (timer.start + timer.interval) - now
188
      if timeout is None or time_left < timeout:
189
        timeout = time_left
190
      if timeout < 0:
191
        timeout = 0
192
        break
193
      elif timeout < min_timeout:
194
        timeout = min_timeout
195
        break
196

    
197
    return timeout * 1000.0
198

    
199
  def _CheckTimers(self, now):
200
    # TODO: Repeating timers
201
    for timer in self._timer.itervalues():
202
      if now < (timer.start + timer.interval):
203
        continue
204

    
205
      timer.owner.OnTimer(timer.timer_id)
206

    
207
      # TODO: Repeating timers should not be removed
208
      self._timer_remove.append(timer.timer_id)
209

    
210
  def _CallSignalWaiters(self, signum):
211
    """Calls all signal waiters for a certain signal.
212

213
    @type signum: int
214
    @param signum: Signal number
215

216
    """
217
    for owner in self._signal_wait:
218
      owner.OnSignal(signal.SIGCHLD)
219

    
220
  def RegisterIO(self, owner, fd, condition):
221
    """Registers a receiver for I/O notifications
222

223
    The receiver must support a "OnIO(self, fd, conditions)" function.
224

225
    @type owner: instance
226
    @param owner: Receiver
227
    @type fd: int
228
    @param fd: File descriptor
229
    @type condition: int
230
    @param condition: ORed field of conditions to be notified
231
                      (see select module)
232

233
    """
234
    # select.Poller also supports file() like objects, but we don't.
235
    assert isinstance(fd, (int, long)), \
236
      "Only integers are supported for file descriptors"
237

    
238
    self._io_wait_add.append((owner, fd, condition))
239

    
240
  def UnregisterIO(self, fd):
241
    """Unregister a file descriptor.
242

243
    It'll be unregistered the next time the mainloop checks for it.
244

245
    @type fd: int
246
    @param fd: File descriptor
247

248
    """
249
    # select.Poller also supports file() like objects, but we don't.
250
    assert isinstance(fd, (int, long)), \
251
      "Only integers are supported for file descriptors"
252

    
253
    self._io_wait_remove.append(fd)
254

    
255
  def RegisterSignal(self, owner):
256
    """Registers a receiver for signal notifications
257

258
    The receiver must support a "OnSignal(self, signum)" function.
259

260
    @type owner: instance
261
    @param owner: Receiver
262

263
    """
264
    self._signal_wait.append(owner)
265

    
266
  def AddTimer(self, owner, interval, repeat):
267
    """Add a new timer.
268

269
    The receiver must support a "OnTimer(self, timer_id)" function.
270

271
    @type owner: instance
272
    @param owner: Receiver
273
    @type interval: int or float
274
    @param interval: Timer interval in seconds
275
    @type repeat: bool
276
    @param repeat: Whether this is a repeating timer or one-off
277

278
    """
279
    # TODO: Implement repeating timers
280
    assert not repeat, "Repeating timers are not yet supported"
281

    
282
    # Get new ID
283
    self._timer_id_last += 1
284

    
285
    timer_id = self._timer_id_last
286

    
287
    self._timer_add.append(Timer(owner, timer_id, time.time(),
288
                                 float(interval), repeat))
289

    
290
    return timer_id
291

    
292
  def RemoveTimer(self, timer_id):
293
    """Removes a timer.
294

295
    @type timer_id: int
296
    @param timer_id: Timer ID
297

298
    """
299
    self._timer_remove.append(timer_id)