Statistics
| Branch: | Tag: | Revision:

root / lib / daemon.py @ 3b1b0cb6

History | View | Annotate | Download (11.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module with helper classes and functions for daemons"""
23

    
24

    
25
import os
26
import select
27
import signal
28
import errno
29
import time
30
import logging
31

    
32
from ganeti import utils
33
from ganeti import constants
34

    
35

    
36
class Timer(object):
37
  def __init__(self, owner, timer_id, start, interval, repeat):
38
    self.owner = owner
39
    self.timer_id = timer_id
40
    self.start = start
41
    self.interval = interval
42
    self.repeat = repeat
43

    
44

    
45
class Mainloop(object):
46
  """Generic mainloop for daemons
47

48
  """
49
  def __init__(self):
50
    """Constructs a new Mainloop instance.
51

52
    """
53
    self._io_wait = {}
54
    self._io_wait_add = []
55
    self._io_wait_remove = []
56
    self._signal_wait = []
57
    self._timer_id_last = 0
58
    self._timer = {}
59
    self._timer_add = []
60
    self._timer_remove = []
61

    
62
  def Run(self, handle_sigchld=True, handle_sigterm=True, stop_on_empty=False):
63
    """Runs the mainloop.
64

65
    @type handle_sigchld: bool
66
    @param handle_sigchld: Whether to install handler for SIGCHLD
67
    @type handle_sigterm: bool
68
    @param handle_sigterm: Whether to install handler for SIGTERM
69
    @type stop_on_empty: bool
70
    @param stop_on_empty: Whether to stop mainloop once all I/O waiters
71
                          unregistered
72

73
    """
74
    poller = select.poll()
75

    
76
    # Setup signal handlers
77
    if handle_sigchld:
78
      sigchld_handler = utils.SignalHandler([signal.SIGCHLD])
79
    else:
80
      sigchld_handler = None
81
    try:
82
      if handle_sigterm:
83
        sigterm_handler = utils.SignalHandler([signal.SIGTERM])
84
      else:
85
        sigterm_handler = None
86

    
87
      try:
88
        running = True
89
        timeout = None
90
        timeout_needs_update = True
91

    
92
        # Start actual main loop
93
        while running:
94
          # Entries could be added again afterwards, hence removing first
95
          if self._io_wait_remove:
96
            for fd in self._io_wait_remove:
97
              try:
98
                poller.unregister(fd)
99
              except KeyError:
100
                pass
101
              try:
102
                del self._io_wait[fd]
103
              except KeyError:
104
                pass
105
            self._io_wait_remove = []
106

    
107
          # Add new entries
108
          if self._io_wait_add:
109
            for (owner, fd, conditions) in self._io_wait_add:
110
              self._io_wait[fd] = owner
111
              poller.register(fd, conditions)
112
            self._io_wait_add = []
113

    
114
          # Add new timers
115
          if self._timer_add:
116
            timeout_needs_update = True
117
            for timer in self._timer_add:
118
              self._timer[timer.timer_id] = timer
119
            del self._timer_add[:]
120

    
121
          # Remove timers
122
          if self._timer_remove:
123
            timeout_needs_update = True
124
            for timer_id in self._timer_remove:
125
              try:
126
                del self._timer[timer_id]
127
              except KeyError:
128
                pass
129
            del self._timer_remove[:]
130

    
131
          # Stop if nothing is listening anymore
132
          if stop_on_empty and not (self._io_wait or self._timer):
133
            break
134

    
135
          # Calculate timeout again if required
136
          if timeout_needs_update:
137
            timeout = self._CalcTimeout(time.time())
138
            timeout_needs_update = False
139

    
140
          # Wait for I/O events
141
          try:
142
            io_events = poller.poll(timeout)
143
          except select.error, err:
144
            # EINTR can happen when signals are sent
145
            if err.args and err.args[0] in (errno.EINTR,):
146
              io_events = None
147
            else:
148
              raise
149

    
150
          after_poll = time.time()
151

    
152
          if io_events:
153
            # Check for I/O events
154
            for (evfd, evcond) in io_events:
155
              owner = self._io_wait.get(evfd, None)
156
              if owner:
157
                owner.OnIO(evfd, evcond)
158

    
159
          if self._timer:
160
            self._CheckTimers(after_poll)
161

    
162
          # Check whether signal was raised
163
          if sigchld_handler and sigchld_handler.called:
164
            self._CallSignalWaiters(signal.SIGCHLD)
165
            sigchld_handler.Clear()
166

    
167
          if sigterm_handler and sigterm_handler.called:
168
            self._CallSignalWaiters(signal.SIGTERM)
169
            running = False
170
            sigterm_handler.Clear()
171
      finally:
172
        # Restore signal handlers
173
        if sigterm_handler:
174
          sigterm_handler.Reset()
175
    finally:
176
      if sigchld_handler:
177
        sigchld_handler.Reset()
178

    
179
  def _CalcTimeout(self, now):
180
    if not self._timer:
181
      return None
182

    
183
    timeout = None
184

    
185
    # TODO: Repeating timers
186

    
187
    min_timeout = 0.001
188

    
189
    for timer in self._timer.itervalues():
190
      time_left = (timer.start + timer.interval) - now
191
      if timeout is None or time_left < timeout:
192
        timeout = time_left
193
      if timeout < 0:
194
        timeout = 0
195
        break
196
      elif timeout < min_timeout:
197
        timeout = min_timeout
198
        break
199

    
200
    return timeout * 1000.0
201

    
202
  def _CheckTimers(self, now):
203
    # TODO: Repeating timers
204
    for timer in self._timer.itervalues():
205
      if now < (timer.start + timer.interval):
206
        continue
207

    
208
      timer.owner.OnTimer(timer.timer_id)
209

    
210
      # TODO: Repeating timers should not be removed
211
      self._timer_remove.append(timer.timer_id)
212

    
213
  def _CallSignalWaiters(self, signum):
214
    """Calls all signal waiters for a certain signal.
215

216
    @type signum: int
217
    @param signum: Signal number
218

219
    """
220
    for owner in self._signal_wait:
221
      owner.OnSignal(signal.SIGCHLD)
222

    
223
  def RegisterIO(self, owner, fd, condition):
224
    """Registers a receiver for I/O notifications
225

226
    The receiver must support a "OnIO(self, fd, conditions)" function.
227

228
    @type owner: instance
229
    @param owner: Receiver
230
    @type fd: int
231
    @param fd: File descriptor
232
    @type condition: int
233
    @param condition: ORed field of conditions to be notified
234
                      (see select module)
235

236
    """
237
    # select.Poller also supports file() like objects, but we don't.
238
    assert isinstance(fd, (int, long)), \
239
      "Only integers are supported for file descriptors"
240

    
241
    self._io_wait_add.append((owner, fd, condition))
242

    
243
  def UnregisterIO(self, fd):
244
    """Unregister a file descriptor.
245

246
    It'll be unregistered the next time the mainloop checks for it.
247

248
    @type fd: int
249
    @param fd: File descriptor
250

251
    """
252
    # select.Poller also supports file() like objects, but we don't.
253
    assert isinstance(fd, (int, long)), \
254
      "Only integers are supported for file descriptors"
255

    
256
    self._io_wait_remove.append(fd)
257

    
258
  def RegisterSignal(self, owner):
259
    """Registers a receiver for signal notifications
260

261
    The receiver must support a "OnSignal(self, signum)" function.
262

263
    @type owner: instance
264
    @param owner: Receiver
265

266
    """
267
    self._signal_wait.append(owner)
268

    
269
  def AddTimer(self, owner, interval, repeat):
270
    """Add a new timer.
271

272
    The receiver must support a "OnTimer(self, timer_id)" function.
273

274
    @type owner: instance
275
    @param owner: Receiver
276
    @type interval: int or float
277
    @param interval: Timer interval in seconds
278
    @type repeat: bool
279
    @param repeat: Whether this is a repeating timer or one-off
280

281
    """
282
    # TODO: Implement repeating timers
283
    assert not repeat, "Repeating timers are not yet supported"
284

    
285
    # Get new ID
286
    self._timer_id_last += 1
287

    
288
    timer_id = self._timer_id_last
289

    
290
    self._timer_add.append(Timer(owner, timer_id, time.time(),
291
                                 float(interval), repeat))
292

    
293
    return timer_id
294

    
295
  def RemoveTimer(self, timer_id):
296
    """Removes a timer.
297

298
    @type timer_id: int
299
    @param timer_id: Timer ID
300

301
    """
302
    self._timer_remove.append(timer_id)
303

    
304

    
305
def GenericMain(daemon_name, optionparser, dirs, check_fn, exec_fn):
306
  """Shared main function for daemons.
307

308
  @type daemon_name: string
309
  @param daemon_name: daemon name
310
  @type optionparser: L{optparse.OptionParser}
311
  @param optionparser: initialized optionparser with daemon-specific options
312
                       (common -f -d options will be handled by this module)
313
  @type options: object @param options: OptionParser result, should contain at
314
                 least the fork and the debug options
315
  @type dirs: list of strings
316
  @param dirs: list of directories that must exist for this daemon to work
317
  @type check_fn: function which accepts (options, args)
318
  @param check_fn: function that checks start conditions and exits if they're
319
                   not met
320
  @type exec_fn: function which accepts (options, args)
321
  @param exec_fn: function that's executed with the daemon's pid file held, and
322
                  runs the daemon itself.
323

324
  """
325
  optionparser.add_option("-f", "--foreground", dest="fork",
326
                          help="Don't detach from the current terminal",
327
                          default=True, action="store_false")
328
  optionparser.add_option("-d", "--debug", dest="debug",
329
                          help="Enable some debug messages",
330
                          default=False, action="store_true")
331
  if daemon_name in constants.DAEMONS_PORTS:
332
    # for networked daemons we also allow choosing the bind port and address.
333
    # by default we use the port provided by utils.GetDaemonPort, and bind to
334
    # 0.0.0.0 (which is represented by and empty bind address.
335
    port = utils.GetDaemonPort(daemon_name)
336
    optionparser.add_option("-p", "--port", dest="port",
337
                            help="Network port (%s default)." % port,
338
                            default=port, type="int")
339
    optionparser.add_option("-b", "--bind", dest="bind_address",
340
                            help="Bind address",
341
                            default="", metavar="ADDRESS")
342

    
343
  if daemon_name in constants.DAEMONS_SSL:
344
    default_cert, default_key = constants.DAEMONS_SSL[daemon_name]
345
    optionparser.add_option("--no-ssl", dest="ssl",
346
                            help="Do not secure HTTP protocol with SSL",
347
                            default=True, action="store_false")
348
    optionparser.add_option("-K", "--ssl-key", dest="ssl_key",
349
                            help="SSL key",
350
                            default=default_key, type="string")
351
    optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert",
352
                            help="SSL certificate",
353
                            default=default_cert, type="string")
354

    
355
  multithread = utils.no_fork = daemon_name in constants.MULTITHREADED_DAEMONS
356

    
357
  options, args = optionparser.parse_args()
358

    
359
  if hasattr(options, 'ssl') and options.ssl:
360
    if not (options.ssl_cert and options.ssl_key):
361
      print >> sys.stderr, "Need key and certificate to use ssl"
362
      sys.exit(constants.EXIT_FAILURE)
363
    for fname in (options.ssl_cert, options.ssl_key):
364
      if not os.path.isfile(fname):
365
        print >> sys.stderr, "Need ssl file %s to run" % fname
366
        sys.exit(constants.EXIT_FAILURE)
367

    
368
  if check_fn is not None:
369
    check_fn(options, args)
370

    
371
  utils.EnsureDirs(dirs)
372

    
373
  if options.fork:
374
    utils.CloseFDs()
375
    utils.Daemonize(logfile=constants.DAEMONS_LOGFILES[daemon_name])
376

    
377
  utils.WritePidFile(daemon_name)
378
  try:
379
    utils.SetupLogging(logfile=constants.DAEMONS_LOGFILES[daemon_name],
380
                       debug=options.debug,
381
                       stderr_logging=not options.fork,
382
                       multithreaded=multithread)
383
    logging.info("%s daemon startup" % daemon_name)
384
    exec_fn(options, args)
385
  finally:
386
    utils.RemovePidFile(daemon_name)
387