Statistics
| Branch: | Tag: | Revision:

root / lib / daemon.py @ 6b405598

History | View | Annotate | Download (8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module with helper classes and functions for daemons"""
23

    
24

    
25
import select
26
import signal
27
import errno
28
import time
29

    
30
from ganeti import utils
31

    
32

    
33
class Timer(object):
34
  def __init__(self, owner, timer_id, start, interval, repeat):
35
    self.owner = owner
36
    self.timer_id = timer_id
37
    self.start = start
38
    self.interval = interval
39
    self.repeat = repeat
40

    
41

    
42
class Mainloop(object):
43
  """Generic mainloop for daemons
44

45
  """
46
  def __init__(self):
47
    """Constructs a new Mainloop instance.
48

49
    """
50
    self._io_wait = {}
51
    self._io_wait_add = []
52
    self._io_wait_remove = []
53
    self._signal_wait = []
54
    self._timer_id_last = 0
55
    self._timer = {}
56
    self._timer_add = []
57
    self._timer_remove = []
58

    
59
  def Run(self, handle_sigchld=True, handle_sigterm=True, stop_on_empty=False):
60
    """Runs the mainloop.
61

62
    @type handle_sigchld: bool
63
    @param handle_sigchld: Whether to install handler for SIGCHLD
64
    @type handle_sigterm: bool
65
    @param handle_sigterm: Whether to install handler for SIGTERM
66
    @type stop_on_empty: bool
67
    @param stop_on_empty: Whether to stop mainloop once all I/O waiters
68
                          unregistered
69

70
    """
71
    poller = select.poll()
72

    
73
    # Setup signal handlers
74
    if handle_sigchld:
75
      sigchld_handler = utils.SignalHandler([signal.SIGCHLD])
76
    else:
77
      sigchld_handler = None
78
    try:
79
      if handle_sigterm:
80
        sigterm_handler = utils.SignalHandler([signal.SIGTERM])
81
      else:
82
        sigterm_handler = None
83

    
84
      try:
85
        running = True
86
        timeout = None
87
        timeout_needs_update = True
88

    
89
        # Start actual main loop
90
        while running:
91
          # Entries could be added again afterwards, hence removing first
92
          if self._io_wait_remove:
93
            for fd in self._io_wait_remove:
94
              try:
95
                poller.unregister(fd)
96
              except KeyError:
97
                pass
98
              try:
99
                del self._io_wait[fd]
100
              except KeyError:
101
                pass
102
            self._io_wait_remove = []
103

    
104
          # Add new entries
105
          if self._io_wait_add:
106
            for (owner, fd, conditions) in self._io_wait_add:
107
              self._io_wait[fd] = owner
108
              poller.register(fd, conditions)
109
            self._io_wait_add = []
110

    
111
          # Add new timers
112
          if self._timer_add:
113
            timeout_needs_update = True
114
            for timer in self._timer_add:
115
              self._timer[timer.timer_id] = timer
116
            del self._timer_add[:]
117

    
118
          # Remove timers
119
          if self._timer_remove:
120
            timeout_needs_update = True
121
            for timer_id in self._timer_remove:
122
              try:
123
                del self._timer[timer_id]
124
              except KeyError:
125
                pass
126
            del self._timer_remove[:]
127

    
128
          # Stop if nothing is listening anymore
129
          if stop_on_empty and not (self._io_wait or self._timer):
130
            break
131

    
132
          # Calculate timeout again if required
133
          if timeout_needs_update:
134
            timeout = self._CalcTimeout(time.time())
135

    
136
          # Wait for I/O events
137
          try:
138
            io_events = poller.poll(timeout)
139
          except select.error, err:
140
            # EINTR can happen when signals are sent
141
            if err.args and err.args[0] in (errno.EINTR,):
142
              io_events = None
143
            else:
144
              raise
145

    
146
          after_poll = time.time()
147

    
148
          if io_events:
149
            # Check for I/O events
150
            for (evfd, evcond) in io_events:
151
              owner = self._io_wait.get(evfd, None)
152
              if owner:
153
                owner.OnIO(evfd, evcond)
154

    
155
          if self._timer:
156
            self._CheckTimers(after_poll)
157

    
158
          # Check whether signal was raised
159
          if sigchld_handler and sigchld_handler.called:
160
            self._CallSignalWaiters(signal.SIGCHLD)
161
            sigchld_handler.Clear()
162

    
163
          if sigterm_handler and sigterm_handler.called:
164
            self._CallSignalWaiters(signal.SIGTERM)
165
            running = False
166
            sigterm_handler.Clear()
167
      finally:
168
        # Restore signal handlers
169
        if sigterm_handler:
170
          sigterm_handler.Reset()
171
    finally:
172
      if sigchld_handler:
173
        sigchld_handler.Reset()
174

    
175
  def _CalcTimeout(self, now):
176
    if not self._timer:
177
      return None
178

    
179
    timeout = None
180

    
181
    # TODO: Repeating timers
182

    
183
    min_timeout = 0.001
184

    
185
    for timer in self._timer.itervalues():
186
      time_left = (timer.start + timer.interval) - now
187
      if timeout is None or time_left < timeout:
188
        timeout = time_left
189
      if timeout < 0:
190
        timeout = 0
191
        break
192
      elif timeout < min_timeout:
193
        timeout = min_timeout
194
        break
195

    
196
    return timeout * 1000.0
197

    
198
  def _CheckTimers(self, now):
199
    # TODO: Repeating timers
200
    for timer in self._timer.itervalues():
201
      if now < (timer.start + timer.interval):
202
        continue
203

    
204
      timer.owner.OnTimer(timer.timer_id)
205

    
206
      # TODO: Repeating timers should not be removed
207
      self._timer_remove.append(timer.timer_id)
208

    
209
  def _CallSignalWaiters(self, signum):
210
    """Calls all signal waiters for a certain signal.
211

212
    @type signum: int
213
    @param signum: Signal number
214

215
    """
216
    for owner in self._signal_wait:
217
      owner.OnSignal(signal.SIGCHLD)
218

    
219
  def RegisterIO(self, owner, fd, condition):
220
    """Registers a receiver for I/O notifications
221

222
    The receiver must support a "OnIO(self, fd, conditions)" function.
223

224
    @type owner: instance
225
    @param owner: Receiver
226
    @type fd: int
227
    @param fd: File descriptor
228
    @type condition: int
229
    @param condition: ORed field of conditions to be notified
230
                      (see select module)
231

232
    """
233
    # select.Poller also supports file() like objects, but we don't.
234
    assert isinstance(fd, (int, long)), \
235
      "Only integers are supported for file descriptors"
236

    
237
    self._io_wait_add.append((owner, fd, condition))
238

    
239
  def UnregisterIO(self, fd):
240
    """Unregister a file descriptor.
241

242
    It'll be unregistered the next time the mainloop checks for it.
243

244
    @type fd: int
245
    @param fd: File descriptor
246

247
    """
248
    # select.Poller also supports file() like objects, but we don't.
249
    assert isinstance(fd, (int, long)), \
250
      "Only integers are supported for file descriptors"
251

    
252
    self._io_wait_remove.append(fd)
253

    
254
  def RegisterSignal(self, owner):
255
    """Registers a receiver for signal notifications
256

257
    The receiver must support a "OnSignal(self, signum)" function.
258

259
    @type owner: instance
260
    @param owner: Receiver
261

262
    """
263
    self._signal_wait.append(owner)
264

    
265
  def AddTimer(self, owner, interval, repeat):
266
    """Add a new timer.
267

268
    The receiver must support a "OnTimer(self, timer_id)" function.
269

270
    @type owner: instance
271
    @param owner: Receiver
272
    @type interval: int or float
273
    @param interval: Timer interval in seconds
274
    @type repeat: bool
275
    @param repeat: Whether this is a repeating timer or one-off
276

277
    """
278
    # TODO: Implement repeating timers
279
    assert not repeat, "Repeating timers are not yet supported"
280

    
281
    # Get new ID
282
    self._timer_id_last += 1
283

    
284
    timer_id = self._timer_id_last
285

    
286
    self._timer_add.append(Timer(owner, timer_id, time.time(),
287
                                 float(interval), repeat))
288

    
289
    return timer_id
290

    
291
  def RemoveTimer(self, timer_id):
292
    """Removes a timer.
293

294
    @type timer_id: int
295
    @param timer_id: Timer ID
296

297
    """
298
    self._timer_remove.append(timer_id)