Statistics
| Branch: | Tag: | Revision:

root / lib / daemon.py @ 04ccf5e9

History | View | Annotate | Download (10.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module with helper classes and functions for daemons"""
23

    
24

    
25
import select
26
import signal
27
import errno
28
import time
29
import logging
30

    
31
from ganeti import utils
32
from ganeti import constants
33

    
34

    
35
class Timer(object):
36
  def __init__(self, owner, timer_id, start, interval, repeat):
37
    self.owner = owner
38
    self.timer_id = timer_id
39
    self.start = start
40
    self.interval = interval
41
    self.repeat = repeat
42

    
43

    
44
class Mainloop(object):
45
  """Generic mainloop for daemons
46

47
  """
48
  def __init__(self):
49
    """Constructs a new Mainloop instance.
50

51
    """
52
    self._io_wait = {}
53
    self._io_wait_add = []
54
    self._io_wait_remove = []
55
    self._signal_wait = []
56
    self._timer_id_last = 0
57
    self._timer = {}
58
    self._timer_add = []
59
    self._timer_remove = []
60

    
61
  def Run(self, handle_sigchld=True, handle_sigterm=True, stop_on_empty=False):
62
    """Runs the mainloop.
63

64
    @type handle_sigchld: bool
65
    @param handle_sigchld: Whether to install handler for SIGCHLD
66
    @type handle_sigterm: bool
67
    @param handle_sigterm: Whether to install handler for SIGTERM
68
    @type stop_on_empty: bool
69
    @param stop_on_empty: Whether to stop mainloop once all I/O waiters
70
                          unregistered
71

72
    """
73
    poller = select.poll()
74

    
75
    # Setup signal handlers
76
    if handle_sigchld:
77
      sigchld_handler = utils.SignalHandler([signal.SIGCHLD])
78
    else:
79
      sigchld_handler = None
80
    try:
81
      if handle_sigterm:
82
        sigterm_handler = utils.SignalHandler([signal.SIGTERM])
83
      else:
84
        sigterm_handler = None
85

    
86
      try:
87
        running = True
88
        timeout = None
89
        timeout_needs_update = True
90

    
91
        # Start actual main loop
92
        while running:
93
          # Entries could be added again afterwards, hence removing first
94
          if self._io_wait_remove:
95
            for fd in self._io_wait_remove:
96
              try:
97
                poller.unregister(fd)
98
              except KeyError:
99
                pass
100
              try:
101
                del self._io_wait[fd]
102
              except KeyError:
103
                pass
104
            self._io_wait_remove = []
105

    
106
          # Add new entries
107
          if self._io_wait_add:
108
            for (owner, fd, conditions) in self._io_wait_add:
109
              self._io_wait[fd] = owner
110
              poller.register(fd, conditions)
111
            self._io_wait_add = []
112

    
113
          # Add new timers
114
          if self._timer_add:
115
            timeout_needs_update = True
116
            for timer in self._timer_add:
117
              self._timer[timer.timer_id] = timer
118
            del self._timer_add[:]
119

    
120
          # Remove timers
121
          if self._timer_remove:
122
            timeout_needs_update = True
123
            for timer_id in self._timer_remove:
124
              try:
125
                del self._timer[timer_id]
126
              except KeyError:
127
                pass
128
            del self._timer_remove[:]
129

    
130
          # Stop if nothing is listening anymore
131
          if stop_on_empty and not (self._io_wait or self._timer):
132
            break
133

    
134
          # Calculate timeout again if required
135
          if timeout_needs_update:
136
            timeout = self._CalcTimeout(time.time())
137
            timeout_needs_update = False
138

    
139
          # Wait for I/O events
140
          try:
141
            io_events = poller.poll(timeout)
142
          except select.error, err:
143
            # EINTR can happen when signals are sent
144
            if err.args and err.args[0] in (errno.EINTR,):
145
              io_events = None
146
            else:
147
              raise
148

    
149
          after_poll = time.time()
150

    
151
          if io_events:
152
            # Check for I/O events
153
            for (evfd, evcond) in io_events:
154
              owner = self._io_wait.get(evfd, None)
155
              if owner:
156
                owner.OnIO(evfd, evcond)
157

    
158
          if self._timer:
159
            self._CheckTimers(after_poll)
160

    
161
          # Check whether signal was raised
162
          if sigchld_handler and sigchld_handler.called:
163
            self._CallSignalWaiters(signal.SIGCHLD)
164
            sigchld_handler.Clear()
165

    
166
          if sigterm_handler and sigterm_handler.called:
167
            self._CallSignalWaiters(signal.SIGTERM)
168
            running = False
169
            sigterm_handler.Clear()
170
      finally:
171
        # Restore signal handlers
172
        if sigterm_handler:
173
          sigterm_handler.Reset()
174
    finally:
175
      if sigchld_handler:
176
        sigchld_handler.Reset()
177

    
178
  def _CalcTimeout(self, now):
179
    if not self._timer:
180
      return None
181

    
182
    timeout = None
183

    
184
    # TODO: Repeating timers
185

    
186
    min_timeout = 0.001
187

    
188
    for timer in self._timer.itervalues():
189
      time_left = (timer.start + timer.interval) - now
190
      if timeout is None or time_left < timeout:
191
        timeout = time_left
192
      if timeout < 0:
193
        timeout = 0
194
        break
195
      elif timeout < min_timeout:
196
        timeout = min_timeout
197
        break
198

    
199
    return timeout * 1000.0
200

    
201
  def _CheckTimers(self, now):
202
    # TODO: Repeating timers
203
    for timer in self._timer.itervalues():
204
      if now < (timer.start + timer.interval):
205
        continue
206

    
207
      timer.owner.OnTimer(timer.timer_id)
208

    
209
      # TODO: Repeating timers should not be removed
210
      self._timer_remove.append(timer.timer_id)
211

    
212
  def _CallSignalWaiters(self, signum):
213
    """Calls all signal waiters for a certain signal.
214

215
    @type signum: int
216
    @param signum: Signal number
217

218
    """
219
    for owner in self._signal_wait:
220
      owner.OnSignal(signal.SIGCHLD)
221

    
222
  def RegisterIO(self, owner, fd, condition):
223
    """Registers a receiver for I/O notifications
224

225
    The receiver must support a "OnIO(self, fd, conditions)" function.
226

227
    @type owner: instance
228
    @param owner: Receiver
229
    @type fd: int
230
    @param fd: File descriptor
231
    @type condition: int
232
    @param condition: ORed field of conditions to be notified
233
                      (see select module)
234

235
    """
236
    # select.Poller also supports file() like objects, but we don't.
237
    assert isinstance(fd, (int, long)), \
238
      "Only integers are supported for file descriptors"
239

    
240
    self._io_wait_add.append((owner, fd, condition))
241

    
242
  def UnregisterIO(self, fd):
243
    """Unregister a file descriptor.
244

245
    It'll be unregistered the next time the mainloop checks for it.
246

247
    @type fd: int
248
    @param fd: File descriptor
249

250
    """
251
    # select.Poller also supports file() like objects, but we don't.
252
    assert isinstance(fd, (int, long)), \
253
      "Only integers are supported for file descriptors"
254

    
255
    self._io_wait_remove.append(fd)
256

    
257
  def RegisterSignal(self, owner):
258
    """Registers a receiver for signal notifications
259

260
    The receiver must support a "OnSignal(self, signum)" function.
261

262
    @type owner: instance
263
    @param owner: Receiver
264

265
    """
266
    self._signal_wait.append(owner)
267

    
268
  def AddTimer(self, owner, interval, repeat):
269
    """Add a new timer.
270

271
    The receiver must support a "OnTimer(self, timer_id)" function.
272

273
    @type owner: instance
274
    @param owner: Receiver
275
    @type interval: int or float
276
    @param interval: Timer interval in seconds
277
    @type repeat: bool
278
    @param repeat: Whether this is a repeating timer or one-off
279

280
    """
281
    # TODO: Implement repeating timers
282
    assert not repeat, "Repeating timers are not yet supported"
283

    
284
    # Get new ID
285
    self._timer_id_last += 1
286

    
287
    timer_id = self._timer_id_last
288

    
289
    self._timer_add.append(Timer(owner, timer_id, time.time(),
290
                                 float(interval), repeat))
291

    
292
    return timer_id
293

    
294
  def RemoveTimer(self, timer_id):
295
    """Removes a timer.
296

297
    @type timer_id: int
298
    @param timer_id: Timer ID
299

300
    """
301
    self._timer_remove.append(timer_id)
302

    
303

    
304
def GenericMain(daemon_name, optionparser, dirs, check_fn, exec_fn):
305
  """Shared main function for daemons.
306

307
  @type daemon_name: string
308
  @param daemon_name: daemon name
309
  @type optionparser: L{optparse.OptionParser}
310
  @param optionparser: initialized optionparser with daemon-specific options
311
                       (common -f -d options will be handled by this module)
312
  @type options: object @param options: OptionParser result, should contain at
313
                 least the fork and the debug options
314
  @type dirs: list of strings
315
  @param dirs: list of directories that must exist for this daemon to work
316
  @type check_fn: function which accepts (options, args)
317
  @param check_fn: function that checks start conditions and exits if they're
318
                   not met
319
  @type exec_fn: function which accepts (options, args)
320
  @param exec_fn: function that's executed with the daemon's pid file held, and
321
                  runs the daemon itself.
322

323
  """
324
  optionparser.add_option("-f", "--foreground", dest="fork",
325
                          help="Don't detach from the current terminal",
326
                          default=True, action="store_false")
327
  optionparser.add_option("-d", "--debug", dest="debug",
328
                          help="Enable some debug messages",
329
                          default=False, action="store_true")
330
  if daemon_name in constants.DAEMONS_PORTS:
331
    # for networked daemons we also allow choosing the bind port and address.
332
    # by default we use the port provided by utils.GetDaemonPort, and bind to
333
    # 0.0.0.0 (which is represented by and empty bind address.
334
    port = utils.GetDaemonPort(daemon_name)
335
    optionparser.add_option("-p", "--port", dest="port",
336
                            help="Network port (%s default)." % port,
337
                            default=port, type="int")
338
    optionparser.add_option("-b", "--bind", dest="bind_address",
339
                            help="Bind address",
340
                            default="", metavar="ADDRESS")
341

    
342
  multithread = utils.no_fork = daemon_name in constants.MULTITHREADED_DAEMONS
343

    
344
  options, args = optionparser.parse_args()
345

    
346
  check_fn(options, args)
347
  utils.EnsureDirs(dirs)
348

    
349
  if options.fork:
350
    utils.CloseFDs()
351
    utils.Daemonize(logfile=constants.DAEMONS_LOGFILES[daemon_name])
352

    
353
  utils.WritePidFile(daemon_name)
354
  try:
355
    utils.SetupLogging(logfile=constants.DAEMONS_LOGFILES[daemon_name],
356
                       debug=options.debug,
357
                       stderr_logging=not options.fork,
358
                       multithreaded=multithread)
359
    logging.info("%s daemon startup" % daemon_name)
360
    exec_fn(options, args)
361
  finally:
362
    utils.RemovePidFile(daemon_name)
363