Statistics
| Branch: | Tag: | Revision:

root / lib / daemon.py @ e7b3ad26

History | View | Annotate | Download (20.8 kB)

1 821d9e43 Michael Hanselmann
#
2 821d9e43 Michael Hanselmann
#
3 821d9e43 Michael Hanselmann
4 1a8337f2 Manuel Franceschini
# Copyright (C) 2006, 2007, 2008, 2010 Google Inc.
5 821d9e43 Michael Hanselmann
#
6 821d9e43 Michael Hanselmann
# This program is free software; you can redistribute it and/or modify
7 821d9e43 Michael Hanselmann
# it under the terms of the GNU General Public License as published by
8 821d9e43 Michael Hanselmann
# the Free Software Foundation; either version 2 of the License, or
9 821d9e43 Michael Hanselmann
# (at your option) any later version.
10 821d9e43 Michael Hanselmann
#
11 821d9e43 Michael Hanselmann
# This program is distributed in the hope that it will be useful, but
12 821d9e43 Michael Hanselmann
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 821d9e43 Michael Hanselmann
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 821d9e43 Michael Hanselmann
# General Public License for more details.
15 821d9e43 Michael Hanselmann
#
16 821d9e43 Michael Hanselmann
# You should have received a copy of the GNU General Public License
17 821d9e43 Michael Hanselmann
# along with this program; if not, write to the Free Software
18 821d9e43 Michael Hanselmann
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 821d9e43 Michael Hanselmann
# 02110-1301, USA.
20 821d9e43 Michael Hanselmann
21 821d9e43 Michael Hanselmann
22 821d9e43 Michael Hanselmann
"""Module with helper classes and functions for daemons"""
23 821d9e43 Michael Hanselmann
24 821d9e43 Michael Hanselmann
25 112d240d Guido Trotter
import asyncore
26 b66ab629 Guido Trotter
import asynchat
27 1e063ccd Guido Trotter
import collections
28 743b53d4 René Nussbaumer
import grp
29 3b1b0cb6 Guido Trotter
import os
30 743b53d4 René Nussbaumer
import pwd
31 821d9e43 Michael Hanselmann
import signal
32 04ccf5e9 Guido Trotter
import logging
33 a02b89cf Guido Trotter
import sched
34 a02b89cf Guido Trotter
import time
35 5f3269fc Guido Trotter
import socket
36 6ddf5c8f Guido Trotter
import select
37 c124045f Iustin Pop
import sys
38 821d9e43 Michael Hanselmann
39 821d9e43 Michael Hanselmann
from ganeti import utils
40 04ccf5e9 Guido Trotter
from ganeti import constants
41 a02b89cf Guido Trotter
from ganeti import errors
42 a744b676 Manuel Franceschini
from ganeti import netutils
43 a02b89cf Guido Trotter
44 a02b89cf Guido Trotter
45 743b53d4 René Nussbaumer
_DEFAULT_RUN_USER = "root"
46 743b53d4 René Nussbaumer
_DEFAULT_RUN_GROUP = "root"
47 743b53d4 René Nussbaumer
48 743b53d4 René Nussbaumer
49 a02b89cf Guido Trotter
class SchedulerBreakout(Exception):
50 a02b89cf Guido Trotter
  """Exception used to get out of the scheduler loop
51 a02b89cf Guido Trotter

52 a02b89cf Guido Trotter
  """
53 a02b89cf Guido Trotter
54 a02b89cf Guido Trotter
55 a02b89cf Guido Trotter
def AsyncoreDelayFunction(timeout):
56 a02b89cf Guido Trotter
  """Asyncore-compatible scheduler delay function.
57 a02b89cf Guido Trotter

58 a02b89cf Guido Trotter
  This is a delay function for sched that, rather than actually sleeping,
59 a02b89cf Guido Trotter
  executes asyncore events happening in the meantime.
60 a02b89cf Guido Trotter

61 a02b89cf Guido Trotter
  After an event has occurred, rather than returning, it raises a
62 a02b89cf Guido Trotter
  SchedulerBreakout exception, which will force the current scheduler.run()
63 a02b89cf Guido Trotter
  invocation to terminate, so that we can also check for signals. The main loop
64 a02b89cf Guido Trotter
  will then call the scheduler run again, which will allow it to actually
65 a02b89cf Guido Trotter
  process any due events.
66 a02b89cf Guido Trotter

67 a02b89cf Guido Trotter
  This is needed because scheduler.run() doesn't support a count=..., as
68 a02b89cf Guido Trotter
  asyncore loop, and the scheduler module documents throwing exceptions from
69 a02b89cf Guido Trotter
  inside the delay function as an allowed usage model.
70 a02b89cf Guido Trotter

71 a02b89cf Guido Trotter
  """
72 a02b89cf Guido Trotter
  asyncore.loop(timeout=timeout, count=1, use_poll=True)
73 a02b89cf Guido Trotter
  raise SchedulerBreakout()
74 a02b89cf Guido Trotter
75 a02b89cf Guido Trotter
76 a02b89cf Guido Trotter
class AsyncoreScheduler(sched.scheduler):
77 a02b89cf Guido Trotter
  """Event scheduler integrated with asyncore
78 a02b89cf Guido Trotter

79 a02b89cf Guido Trotter
  """
80 a02b89cf Guido Trotter
  def __init__(self, timefunc):
81 a02b89cf Guido Trotter
    sched.scheduler.__init__(self, timefunc, AsyncoreDelayFunction)
82 821d9e43 Michael Hanselmann
83 821d9e43 Michael Hanselmann
84 b11780bb Guido Trotter
class GanetiBaseAsyncoreDispatcher(asyncore.dispatcher):
85 b11780bb Guido Trotter
  """Base Ganeti Asyncore Dispacher
86 b11780bb Guido Trotter

87 b11780bb Guido Trotter
  """
88 b11780bb Guido Trotter
  # this method is overriding an asyncore.dispatcher method
89 b11780bb Guido Trotter
  def handle_error(self):
90 b11780bb Guido Trotter
    """Log an error in handling any request, and proceed.
91 b11780bb Guido Trotter

92 b11780bb Guido Trotter
    """
93 b11780bb Guido Trotter
    logging.exception("Error while handling asyncore request")
94 b11780bb Guido Trotter
95 b11780bb Guido Trotter
  # this method is overriding an asyncore.dispatcher method
96 b11780bb Guido Trotter
  def writable(self):
97 b11780bb Guido Trotter
    """Most of the time we don't want to check for writability.
98 b11780bb Guido Trotter

99 b11780bb Guido Trotter
    """
100 b11780bb Guido Trotter
    return False
101 b11780bb Guido Trotter
102 b11780bb Guido Trotter
103 a4b605ae Guido Trotter
class AsyncStreamServer(GanetiBaseAsyncoreDispatcher):
104 a4b605ae Guido Trotter
  """A stream server to use with asyncore.
105 a4b605ae Guido Trotter

106 a4b605ae Guido Trotter
  Each request is accepted, and then dispatched to a separate asyncore
107 a4b605ae Guido Trotter
  dispatcher to handle.
108 a4b605ae Guido Trotter

109 a4b605ae Guido Trotter
  """
110 a4b605ae Guido Trotter
111 a4b605ae Guido Trotter
  _REQUEST_QUEUE_SIZE = 5
112 a4b605ae Guido Trotter
113 a4b605ae Guido Trotter
  def __init__(self, family, address):
114 a4b605ae Guido Trotter
    """Constructor for AsyncUnixStreamSocket
115 a4b605ae Guido Trotter

116 a4b605ae Guido Trotter
    @type family: integer
117 a4b605ae Guido Trotter
    @param family: socket family (one of socket.AF_*)
118 a4b605ae Guido Trotter
    @type address: address family dependent
119 a4b605ae Guido Trotter
    @param address: address to bind the socket to
120 a4b605ae Guido Trotter

121 a4b605ae Guido Trotter
    """
122 a4b605ae Guido Trotter
    GanetiBaseAsyncoreDispatcher.__init__(self)
123 a4b605ae Guido Trotter
    self.family = family
124 a4b605ae Guido Trotter
    self.create_socket(self.family, socket.SOCK_STREAM)
125 a4b605ae Guido Trotter
    self.set_reuse_addr()
126 a4b605ae Guido Trotter
    self.bind(address)
127 a4b605ae Guido Trotter
    self.listen(self._REQUEST_QUEUE_SIZE)
128 a4b605ae Guido Trotter
129 a4b605ae Guido Trotter
  # this method is overriding an asyncore.dispatcher method
130 a4b605ae Guido Trotter
  def handle_accept(self):
131 a4b605ae Guido Trotter
    """Accept a new client connection.
132 a4b605ae Guido Trotter

133 a4b605ae Guido Trotter
    Creates a new instance of the handler class, which will use asyncore to
134 a4b605ae Guido Trotter
    serve the client.
135 a4b605ae Guido Trotter

136 a4b605ae Guido Trotter
    """
137 a4b605ae Guido Trotter
    accept_result = utils.IgnoreSignals(self.accept)
138 a4b605ae Guido Trotter
    if accept_result is not None:
139 a4b605ae Guido Trotter
      connected_socket, client_address = accept_result
140 a4b605ae Guido Trotter
      if self.family == socket.AF_UNIX:
141 a4b605ae Guido Trotter
        # override the client address, as for unix sockets nothing meaningful
142 a4b605ae Guido Trotter
        # is passed in from accept anyway
143 a744b676 Manuel Franceschini
        client_address = netutils.GetSocketCredentials(connected_socket)
144 a4b605ae Guido Trotter
      logging.info("Accepted connection from %s",
145 1a8337f2 Manuel Franceschini
                   netutils.FormatAddress(self.family, client_address))
146 a4b605ae Guido Trotter
      self.handle_connection(connected_socket, client_address)
147 a4b605ae Guido Trotter
148 a4b605ae Guido Trotter
  def handle_connection(self, connected_socket, client_address):
149 a4b605ae Guido Trotter
    """Handle an already accepted connection.
150 a4b605ae Guido Trotter

151 a4b605ae Guido Trotter
    """
152 a4b605ae Guido Trotter
    raise NotImplementedError
153 a4b605ae Guido Trotter
154 a4b605ae Guido Trotter
155 b66ab629 Guido Trotter
class AsyncTerminatedMessageStream(asynchat.async_chat):
156 b66ab629 Guido Trotter
  """A terminator separated message stream asyncore module.
157 b66ab629 Guido Trotter

158 b66ab629 Guido Trotter
  Handles a stream connection receiving messages terminated by a defined
159 b66ab629 Guido Trotter
  separator. For each complete message handle_message is called.
160 b66ab629 Guido Trotter

161 b66ab629 Guido Trotter
  """
162 37e62cb9 Guido Trotter
  def __init__(self, connected_socket, peer_address, terminator, family,
163 37e62cb9 Guido Trotter
               unhandled_limit):
164 b66ab629 Guido Trotter
    """AsyncTerminatedMessageStream constructor.
165 b66ab629 Guido Trotter

166 b66ab629 Guido Trotter
    @type connected_socket: socket.socket
167 b66ab629 Guido Trotter
    @param connected_socket: connected stream socket to receive messages from
168 b66ab629 Guido Trotter
    @param peer_address: family-specific peer address
169 b66ab629 Guido Trotter
    @type terminator: string
170 b66ab629 Guido Trotter
    @param terminator: terminator separating messages in the stream
171 b66ab629 Guido Trotter
    @type family: integer
172 b66ab629 Guido Trotter
    @param family: socket family
173 37e62cb9 Guido Trotter
    @type unhandled_limit: integer or None
174 37e62cb9 Guido Trotter
    @param unhandled_limit: maximum unanswered messages
175 b66ab629 Guido Trotter

176 b66ab629 Guido Trotter
    """
177 b66ab629 Guido Trotter
    # python 2.4/2.5 uses conn=... while 2.6 has sock=... we have to cheat by
178 b66ab629 Guido Trotter
    # using a positional argument rather than a keyword one.
179 b66ab629 Guido Trotter
    asynchat.async_chat.__init__(self, connected_socket)
180 b66ab629 Guido Trotter
    self.connected_socket = connected_socket
181 b66ab629 Guido Trotter
    # on python 2.4 there is no "family" attribute for the socket class
182 b66ab629 Guido Trotter
    # FIXME: when we move to python 2.5 or above remove the family parameter
183 b66ab629 Guido Trotter
    #self.family = self.connected_socket.family
184 b66ab629 Guido Trotter
    self.family = family
185 b66ab629 Guido Trotter
    self.peer_address = peer_address
186 b66ab629 Guido Trotter
    self.terminator = terminator
187 37e62cb9 Guido Trotter
    self.unhandled_limit = unhandled_limit
188 b66ab629 Guido Trotter
    self.set_terminator(terminator)
189 b66ab629 Guido Trotter
    self.ibuffer = []
190 37e62cb9 Guido Trotter
    self.receive_count = 0
191 37e62cb9 Guido Trotter
    self.send_count = 0
192 1e063ccd Guido Trotter
    self.oqueue = collections.deque()
193 37e62cb9 Guido Trotter
    self.iqueue = collections.deque()
194 b66ab629 Guido Trotter
195 b66ab629 Guido Trotter
  # this method is overriding an asynchat.async_chat method
196 b66ab629 Guido Trotter
  def collect_incoming_data(self, data):
197 b66ab629 Guido Trotter
    self.ibuffer.append(data)
198 b66ab629 Guido Trotter
199 37e62cb9 Guido Trotter
  def _can_handle_message(self):
200 37e62cb9 Guido Trotter
    return (self.unhandled_limit is None or
201 37e62cb9 Guido Trotter
            (self.receive_count < self.send_count + self.unhandled_limit) and
202 37e62cb9 Guido Trotter
             not self.iqueue)
203 37e62cb9 Guido Trotter
204 b66ab629 Guido Trotter
  # this method is overriding an asynchat.async_chat method
205 b66ab629 Guido Trotter
  def found_terminator(self):
206 b66ab629 Guido Trotter
    message = "".join(self.ibuffer)
207 b66ab629 Guido Trotter
    self.ibuffer = []
208 37e62cb9 Guido Trotter
    message_id = self.receive_count
209 37e62cb9 Guido Trotter
    # We need to increase the receive_count after checking if the message can
210 37e62cb9 Guido Trotter
    # be handled, but before calling handle_message
211 37e62cb9 Guido Trotter
    can_handle = self._can_handle_message()
212 37e62cb9 Guido Trotter
    self.receive_count += 1
213 37e62cb9 Guido Trotter
    if can_handle:
214 37e62cb9 Guido Trotter
      self.handle_message(message, message_id)
215 37e62cb9 Guido Trotter
    else:
216 37e62cb9 Guido Trotter
      self.iqueue.append((message, message_id))
217 b66ab629 Guido Trotter
218 b66ab629 Guido Trotter
  def handle_message(self, message, message_id):
219 b66ab629 Guido Trotter
    """Handle a terminated message.
220 b66ab629 Guido Trotter

221 b66ab629 Guido Trotter
    @type message: string
222 b66ab629 Guido Trotter
    @param message: message to handle
223 b66ab629 Guido Trotter
    @type message_id: integer
224 b66ab629 Guido Trotter
    @param message_id: stream's message sequence number
225 b66ab629 Guido Trotter

226 b66ab629 Guido Trotter
    """
227 b66ab629 Guido Trotter
    pass
228 b66ab629 Guido Trotter
    # TODO: move this method to raise NotImplementedError
229 b66ab629 Guido Trotter
    # raise NotImplementedError
230 b66ab629 Guido Trotter
231 1e063ccd Guido Trotter
  def send_message(self, message):
232 1e063ccd Guido Trotter
    """Send a message to the remote peer. This function is thread-safe.
233 1e063ccd Guido Trotter

234 1e063ccd Guido Trotter
    @type message: string
235 1e063ccd Guido Trotter
    @param message: message to send, without the terminator
236 1e063ccd Guido Trotter

237 1e063ccd Guido Trotter
    @warning: If calling this function from a thread different than the one
238 1e063ccd Guido Trotter
    performing the main asyncore loop, remember that you have to wake that one
239 1e063ccd Guido Trotter
    up.
240 1e063ccd Guido Trotter

241 1e063ccd Guido Trotter
    """
242 1e063ccd Guido Trotter
    # If we just append the message we received to the output queue, this
243 1e063ccd Guido Trotter
    # function can be safely called by multiple threads at the same time, and
244 37e62cb9 Guido Trotter
    # we don't need locking, since deques are thread safe. handle_write in the
245 37e62cb9 Guido Trotter
    # asyncore thread will handle the next input message if there are any
246 37e62cb9 Guido Trotter
    # enqueued.
247 1e063ccd Guido Trotter
    self.oqueue.append(message)
248 1e063ccd Guido Trotter
249 1e063ccd Guido Trotter
  # this method is overriding an asyncore.dispatcher method
250 37e62cb9 Guido Trotter
  def readable(self):
251 37e62cb9 Guido Trotter
    # read from the socket if we can handle the next requests
252 37e62cb9 Guido Trotter
    return self._can_handle_message() and asynchat.async_chat.readable(self)
253 37e62cb9 Guido Trotter
254 37e62cb9 Guido Trotter
  # this method is overriding an asyncore.dispatcher method
255 1e063ccd Guido Trotter
  def writable(self):
256 1e063ccd Guido Trotter
    # the output queue may become full just after we called writable. This only
257 1e063ccd Guido Trotter
    # works if we know we'll have something else waking us up from the select,
258 1e063ccd Guido Trotter
    # in such case, anyway.
259 1e063ccd Guido Trotter
    return asynchat.async_chat.writable(self) or self.oqueue
260 1e063ccd Guido Trotter
261 1e063ccd Guido Trotter
  # this method is overriding an asyncore.dispatcher method
262 1e063ccd Guido Trotter
  def handle_write(self):
263 1e063ccd Guido Trotter
    if self.oqueue:
264 37e62cb9 Guido Trotter
      # if we have data in the output queue, then send_message was called.
265 37e62cb9 Guido Trotter
      # this means we can process one more message from the input queue, if
266 37e62cb9 Guido Trotter
      # there are any.
267 1e063ccd Guido Trotter
      data = self.oqueue.popleft()
268 1e063ccd Guido Trotter
      self.push(data + self.terminator)
269 37e62cb9 Guido Trotter
      self.send_count += 1
270 37e62cb9 Guido Trotter
      if self.iqueue:
271 37e62cb9 Guido Trotter
        self.handle_message(*self.iqueue.popleft())
272 1e063ccd Guido Trotter
    self.initiate_send()
273 1e063ccd Guido Trotter
274 b66ab629 Guido Trotter
  def close_log(self):
275 b66ab629 Guido Trotter
    logging.info("Closing connection from %s",
276 1a8337f2 Manuel Franceschini
                 netutils.FormatAddress(self.family, self.peer_address))
277 b66ab629 Guido Trotter
    self.close()
278 b66ab629 Guido Trotter
279 b66ab629 Guido Trotter
  # this method is overriding an asyncore.dispatcher method
280 b66ab629 Guido Trotter
  def handle_expt(self):
281 b66ab629 Guido Trotter
    self.close_log()
282 b66ab629 Guido Trotter
283 b66ab629 Guido Trotter
  # this method is overriding an asyncore.dispatcher method
284 b66ab629 Guido Trotter
  def handle_error(self):
285 b66ab629 Guido Trotter
    """Log an error in handling any request, and proceed.
286 b66ab629 Guido Trotter

287 b66ab629 Guido Trotter
    """
288 b66ab629 Guido Trotter
    logging.exception("Error while handling asyncore request")
289 b66ab629 Guido Trotter
    self.close_log()
290 b66ab629 Guido Trotter
291 b66ab629 Guido Trotter
292 b11780bb Guido Trotter
class AsyncUDPSocket(GanetiBaseAsyncoreDispatcher):
293 5f3269fc Guido Trotter
  """An improved asyncore udp socket.
294 5f3269fc Guido Trotter

295 5f3269fc Guido Trotter
  """
296 d8bcfe21 Manuel Franceschini
  def __init__(self, family):
297 5f3269fc Guido Trotter
    """Constructor for AsyncUDPSocket
298 5f3269fc Guido Trotter

299 5f3269fc Guido Trotter
    """
300 b11780bb Guido Trotter
    GanetiBaseAsyncoreDispatcher.__init__(self)
301 5f3269fc Guido Trotter
    self._out_queue = []
302 d8bcfe21 Manuel Franceschini
    self._family = family
303 d8bcfe21 Manuel Franceschini
    self.create_socket(family, socket.SOCK_DGRAM)
304 5f3269fc Guido Trotter
305 5f3269fc Guido Trotter
  # this method is overriding an asyncore.dispatcher method
306 5f3269fc Guido Trotter
  def handle_connect(self):
307 5f3269fc Guido Trotter
    # Python thinks that the first udp message from a source qualifies as a
308 5f3269fc Guido Trotter
    # "connect" and further ones are part of the same connection. We beg to
309 5f3269fc Guido Trotter
    # differ and treat all messages equally.
310 5f3269fc Guido Trotter
    pass
311 5f3269fc Guido Trotter
312 5f3269fc Guido Trotter
  # this method is overriding an asyncore.dispatcher method
313 5f3269fc Guido Trotter
  def handle_read(self):
314 6e7e58b4 Guido Trotter
    recv_result = utils.IgnoreSignals(self.recvfrom,
315 6e7e58b4 Guido Trotter
                                      constants.MAX_UDP_DATA_SIZE)
316 6e7e58b4 Guido Trotter
    if recv_result is not None:
317 6e7e58b4 Guido Trotter
      payload, address = recv_result
318 d8bcfe21 Manuel Franceschini
      if self._family == socket.AF_INET6:
319 d8bcfe21 Manuel Franceschini
        # we ignore 'flow info' and 'scope id' as we don't need them
320 d8bcfe21 Manuel Franceschini
        ip, port, _, _ = address
321 d8bcfe21 Manuel Franceschini
      else:
322 d8bcfe21 Manuel Franceschini
        ip, port = address
323 d8bcfe21 Manuel Franceschini
324 6e7e58b4 Guido Trotter
      self.handle_datagram(payload, ip, port)
325 5f3269fc Guido Trotter
326 5f3269fc Guido Trotter
  def handle_datagram(self, payload, ip, port):
327 5f3269fc Guido Trotter
    """Handle an already read udp datagram
328 5f3269fc Guido Trotter

329 5f3269fc Guido Trotter
    """
330 5f3269fc Guido Trotter
    raise NotImplementedError
331 5f3269fc Guido Trotter
332 5f3269fc Guido Trotter
  # this method is overriding an asyncore.dispatcher method
333 5f3269fc Guido Trotter
  def writable(self):
334 5f3269fc Guido Trotter
    # We should check whether we can write to the socket only if we have
335 5f3269fc Guido Trotter
    # something scheduled to be written
336 5f3269fc Guido Trotter
    return bool(self._out_queue)
337 5f3269fc Guido Trotter
338 48bf6352 Guido Trotter
  # this method is overriding an asyncore.dispatcher method
339 5f3269fc Guido Trotter
  def handle_write(self):
340 3660fcf5 Guido Trotter
    if not self._out_queue:
341 3660fcf5 Guido Trotter
      logging.error("handle_write called with empty output queue")
342 3660fcf5 Guido Trotter
      return
343 3660fcf5 Guido Trotter
    (ip, port, payload) = self._out_queue[0]
344 232144d0 Guido Trotter
    utils.IgnoreSignals(self.sendto, payload, 0, (ip, port))
345 3660fcf5 Guido Trotter
    self._out_queue.pop(0)
346 3660fcf5 Guido Trotter
347 5f3269fc Guido Trotter
  def enqueue_send(self, ip, port, payload):
348 5f3269fc Guido Trotter
    """Enqueue a datagram to be sent when possible
349 5f3269fc Guido Trotter

350 5f3269fc Guido Trotter
    """
351 c8eded0b Guido Trotter
    if len(payload) > constants.MAX_UDP_DATA_SIZE:
352 c8eded0b Guido Trotter
      raise errors.UdpDataSizeError('Packet too big: %s > %s' % (len(payload),
353 c8eded0b Guido Trotter
                                    constants.MAX_UDP_DATA_SIZE))
354 5f3269fc Guido Trotter
    self._out_queue.append((ip, port, payload))
355 5f3269fc Guido Trotter
356 6ddf5c8f Guido Trotter
  def process_next_packet(self, timeout=0):
357 6ddf5c8f Guido Trotter
    """Process the next datagram, waiting for it if necessary.
358 6ddf5c8f Guido Trotter

359 6ddf5c8f Guido Trotter
    @type timeout: float
360 6ddf5c8f Guido Trotter
    @param timeout: how long to wait for data
361 6ddf5c8f Guido Trotter
    @rtype: boolean
362 6ddf5c8f Guido Trotter
    @return: True if some data has been handled, False otherwise
363 6ddf5c8f Guido Trotter

364 6ddf5c8f Guido Trotter
    """
365 1b429e2a Iustin Pop
    result = utils.WaitForFdCondition(self, select.POLLIN, timeout)
366 1b429e2a Iustin Pop
    if result is not None and result & select.POLLIN:
367 3660fcf5 Guido Trotter
      self.handle_read()
368 6ddf5c8f Guido Trotter
      return True
369 6ddf5c8f Guido Trotter
    else:
370 6ddf5c8f Guido Trotter
      return False
371 6ddf5c8f Guido Trotter
372 5f3269fc Guido Trotter
373 495ba852 Guido Trotter
class AsyncAwaker(GanetiBaseAsyncoreDispatcher):
374 495ba852 Guido Trotter
  """A way to notify the asyncore loop that something is going on.
375 495ba852 Guido Trotter

376 495ba852 Guido Trotter
  If an asyncore daemon is multithreaded when a thread tries to push some data
377 495ba852 Guido Trotter
  to a socket, the main loop handling asynchronous requests might be sleeping
378 495ba852 Guido Trotter
  waiting on a select(). To avoid this it can create an instance of the
379 495ba852 Guido Trotter
  AsyncAwaker, which other threads can use to wake it up.
380 495ba852 Guido Trotter

381 495ba852 Guido Trotter
  """
382 495ba852 Guido Trotter
  def __init__(self, signal_fn=None):
383 495ba852 Guido Trotter
    """Constructor for AsyncAwaker
384 495ba852 Guido Trotter

385 495ba852 Guido Trotter
    @type signal_fn: function
386 495ba852 Guido Trotter
    @param signal_fn: function to call when awaken
387 495ba852 Guido Trotter

388 495ba852 Guido Trotter
    """
389 495ba852 Guido Trotter
    GanetiBaseAsyncoreDispatcher.__init__(self)
390 495ba852 Guido Trotter
    assert signal_fn == None or callable(signal_fn)
391 495ba852 Guido Trotter
    (self.in_socket, self.out_socket) = socket.socketpair(socket.AF_UNIX,
392 495ba852 Guido Trotter
                                                          socket.SOCK_STREAM)
393 495ba852 Guido Trotter
    self.in_socket.setblocking(0)
394 b628191f Guido Trotter
    self.in_socket.shutdown(socket.SHUT_WR)
395 b628191f Guido Trotter
    self.out_socket.shutdown(socket.SHUT_RD)
396 495ba852 Guido Trotter
    self.set_socket(self.in_socket)
397 495ba852 Guido Trotter
    self.need_signal = True
398 495ba852 Guido Trotter
    self.signal_fn = signal_fn
399 495ba852 Guido Trotter
    self.connected = True
400 495ba852 Guido Trotter
401 495ba852 Guido Trotter
  # this method is overriding an asyncore.dispatcher method
402 495ba852 Guido Trotter
  def handle_read(self):
403 495ba852 Guido Trotter
    utils.IgnoreSignals(self.recv, 4096)
404 495ba852 Guido Trotter
    if self.signal_fn:
405 495ba852 Guido Trotter
      self.signal_fn()
406 495ba852 Guido Trotter
    self.need_signal = True
407 495ba852 Guido Trotter
408 495ba852 Guido Trotter
  # this method is overriding an asyncore.dispatcher method
409 495ba852 Guido Trotter
  def close(self):
410 495ba852 Guido Trotter
    asyncore.dispatcher.close(self)
411 495ba852 Guido Trotter
    self.out_socket.close()
412 495ba852 Guido Trotter
413 495ba852 Guido Trotter
  def signal(self):
414 495ba852 Guido Trotter
    """Signal the asyncore main loop.
415 495ba852 Guido Trotter

416 495ba852 Guido Trotter
    Any data we send here will be ignored, but it will cause the select() call
417 495ba852 Guido Trotter
    to return.
418 495ba852 Guido Trotter

419 495ba852 Guido Trotter
    """
420 495ba852 Guido Trotter
    # Yes, there is a race condition here. No, we don't care, at worst we're
421 495ba852 Guido Trotter
    # sending more than one wakeup token, which doesn't harm at all.
422 495ba852 Guido Trotter
    if self.need_signal:
423 495ba852 Guido Trotter
      self.need_signal = False
424 495ba852 Guido Trotter
      self.out_socket.send("\0")
425 495ba852 Guido Trotter
426 495ba852 Guido Trotter
427 821d9e43 Michael Hanselmann
class Mainloop(object):
428 821d9e43 Michael Hanselmann
  """Generic mainloop for daemons
429 821d9e43 Michael Hanselmann

430 69b99987 Michael Hanselmann
  @ivar scheduler: A sched.scheduler object, which can be used to register
431 69b99987 Michael Hanselmann
    timed events
432 69b99987 Michael Hanselmann

433 821d9e43 Michael Hanselmann
  """
434 821d9e43 Michael Hanselmann
  def __init__(self):
435 b14b975f Michael Hanselmann
    """Constructs a new Mainloop instance.
436 b14b975f Michael Hanselmann

437 b14b975f Michael Hanselmann
    """
438 821d9e43 Michael Hanselmann
    self._signal_wait = []
439 a02b89cf Guido Trotter
    self.scheduler = AsyncoreScheduler(time.time)
440 821d9e43 Michael Hanselmann
441 9b739173 Guido Trotter
  @utils.SignalHandled([signal.SIGCHLD])
442 9b739173 Guido Trotter
  @utils.SignalHandled([signal.SIGTERM])
443 f59dce3e Guido Trotter
  @utils.SignalHandled([signal.SIGINT])
444 69b99987 Michael Hanselmann
  def Run(self, signal_handlers=None):
445 b14b975f Michael Hanselmann
    """Runs the mainloop.
446 b14b975f Michael Hanselmann

447 9b739173 Guido Trotter
    @type signal_handlers: dict
448 9b739173 Guido Trotter
    @param signal_handlers: signal->L{utils.SignalHandler} passed by decorator
449 b14b975f Michael Hanselmann

450 b14b975f Michael Hanselmann
    """
451 9b739173 Guido Trotter
    assert isinstance(signal_handlers, dict) and \
452 9b739173 Guido Trotter
           len(signal_handlers) > 0, \
453 9b739173 Guido Trotter
           "Broken SignalHandled decorator"
454 9b739173 Guido Trotter
    running = True
455 9b739173 Guido Trotter
    # Start actual main loop
456 9b739173 Guido Trotter
    while running:
457 a02b89cf Guido Trotter
      if not self.scheduler.empty():
458 a02b89cf Guido Trotter
        try:
459 a02b89cf Guido Trotter
          self.scheduler.run()
460 a02b89cf Guido Trotter
        except SchedulerBreakout:
461 a02b89cf Guido Trotter
          pass
462 a02b89cf Guido Trotter
      else:
463 a02b89cf Guido Trotter
        asyncore.loop(count=1, use_poll=True)
464 9b739173 Guido Trotter
465 9b739173 Guido Trotter
      # Check whether a signal was raised
466 9b739173 Guido Trotter
      for sig in signal_handlers:
467 9b739173 Guido Trotter
        handler = signal_handlers[sig]
468 9b739173 Guido Trotter
        if handler.called:
469 9b739173 Guido Trotter
          self._CallSignalWaiters(sig)
470 f59dce3e Guido Trotter
          running = sig not in (signal.SIGTERM, signal.SIGINT)
471 9b739173 Guido Trotter
          handler.Clear()
472 a570e2a8 Guido Trotter
473 b14b975f Michael Hanselmann
  def _CallSignalWaiters(self, signum):
474 b14b975f Michael Hanselmann
    """Calls all signal waiters for a certain signal.
475 b14b975f Michael Hanselmann

476 b14b975f Michael Hanselmann
    @type signum: int
477 b14b975f Michael Hanselmann
    @param signum: Signal number
478 b14b975f Michael Hanselmann

479 b14b975f Michael Hanselmann
    """
480 b14b975f Michael Hanselmann
    for owner in self._signal_wait:
481 a9fe7232 Guido Trotter
      owner.OnSignal(signum)
482 821d9e43 Michael Hanselmann
483 821d9e43 Michael Hanselmann
  def RegisterSignal(self, owner):
484 821d9e43 Michael Hanselmann
    """Registers a receiver for signal notifications
485 821d9e43 Michael Hanselmann

486 821d9e43 Michael Hanselmann
    The receiver must support a "OnSignal(self, signum)" function.
487 821d9e43 Michael Hanselmann

488 821d9e43 Michael Hanselmann
    @type owner: instance
489 821d9e43 Michael Hanselmann
    @param owner: Receiver
490 821d9e43 Michael Hanselmann

491 821d9e43 Michael Hanselmann
    """
492 821d9e43 Michael Hanselmann
    self._signal_wait.append(owner)
493 b11c9e5c Michael Hanselmann
494 04ccf5e9 Guido Trotter
495 30dabd03 Michael Hanselmann
def GenericMain(daemon_name, optionparser, dirs, check_fn, exec_fn,
496 1c54156d Luca Bigliardi
                multithreaded=False, console_logging=False,
497 743b53d4 René Nussbaumer
                default_ssl_cert=None, default_ssl_key=None,
498 743b53d4 René Nussbaumer
                user=_DEFAULT_RUN_USER, group=_DEFAULT_RUN_GROUP):
499 04ccf5e9 Guido Trotter
  """Shared main function for daemons.
500 04ccf5e9 Guido Trotter

501 04ccf5e9 Guido Trotter
  @type daemon_name: string
502 04ccf5e9 Guido Trotter
  @param daemon_name: daemon name
503 69b99987 Michael Hanselmann
  @type optionparser: optparse.OptionParser
504 04ccf5e9 Guido Trotter
  @param optionparser: initialized optionparser with daemon-specific options
505 04ccf5e9 Guido Trotter
                       (common -f -d options will be handled by this module)
506 5a062513 Guido Trotter
  @type dirs: list of (string, integer)
507 5a062513 Guido Trotter
  @param dirs: list of directories that must be created if they don't exist,
508 5a062513 Guido Trotter
               and the permissions to be used to create them
509 04ccf5e9 Guido Trotter
  @type check_fn: function which accepts (options, args)
510 04ccf5e9 Guido Trotter
  @param check_fn: function that checks start conditions and exits if they're
511 04ccf5e9 Guido Trotter
                   not met
512 04ccf5e9 Guido Trotter
  @type exec_fn: function which accepts (options, args)
513 04ccf5e9 Guido Trotter
  @param exec_fn: function that's executed with the daemon's pid file held, and
514 04ccf5e9 Guido Trotter
                  runs the daemon itself.
515 30dabd03 Michael Hanselmann
  @type multithreaded: bool
516 30dabd03 Michael Hanselmann
  @param multithreaded: Whether the daemon uses threads
517 ff917534 Luca Bigliardi
  @type console_logging: boolean
518 ff917534 Luca Bigliardi
  @param console_logging: if True, the daemon will fall back to the system
519 ff917534 Luca Bigliardi
                          console if logging fails
520 0648750e Michael Hanselmann
  @type default_ssl_cert: string
521 0648750e Michael Hanselmann
  @param default_ssl_cert: Default SSL certificate path
522 0648750e Michael Hanselmann
  @type default_ssl_key: string
523 0648750e Michael Hanselmann
  @param default_ssl_key: Default SSL key path
524 743b53d4 René Nussbaumer
  @param user: Default user to run as
525 743b53d4 René Nussbaumer
  @type user: string
526 743b53d4 René Nussbaumer
  @param group: Default group to run as
527 743b53d4 René Nussbaumer
  @type group: string
528 04ccf5e9 Guido Trotter

529 04ccf5e9 Guido Trotter
  """
530 04ccf5e9 Guido Trotter
  optionparser.add_option("-f", "--foreground", dest="fork",
531 04ccf5e9 Guido Trotter
                          help="Don't detach from the current terminal",
532 04ccf5e9 Guido Trotter
                          default=True, action="store_false")
533 04ccf5e9 Guido Trotter
  optionparser.add_option("-d", "--debug", dest="debug",
534 04ccf5e9 Guido Trotter
                          help="Enable some debug messages",
535 04ccf5e9 Guido Trotter
                          default=False, action="store_true")
536 551b6283 Iustin Pop
  optionparser.add_option("--syslog", dest="syslog",
537 551b6283 Iustin Pop
                          help="Enable logging to syslog (except debug"
538 551b6283 Iustin Pop
                          " messages); one of 'no', 'yes' or 'only' [%s]" %
539 551b6283 Iustin Pop
                          constants.SYSLOG_USAGE,
540 551b6283 Iustin Pop
                          default=constants.SYSLOG_USAGE,
541 551b6283 Iustin Pop
                          choices=["no", "yes", "only"])
542 0a71aa17 Michael Hanselmann
543 04ccf5e9 Guido Trotter
  if daemon_name in constants.DAEMONS_PORTS:
544 14f5f1b6 Manuel Franceschini
    default_bind_address = constants.IP4_ADDRESS_ANY
545 a744b676 Manuel Franceschini
    default_port = netutils.GetDaemonPort(daemon_name)
546 0a71aa17 Michael Hanselmann
547 0a71aa17 Michael Hanselmann
    # For networked daemons we allow choosing the port and bind address
548 04ccf5e9 Guido Trotter
    optionparser.add_option("-p", "--port", dest="port",
549 0a71aa17 Michael Hanselmann
                            help="Network port (default: %s)" % default_port,
550 0a71aa17 Michael Hanselmann
                            default=default_port, type="int")
551 04ccf5e9 Guido Trotter
    optionparser.add_option("-b", "--bind", dest="bind_address",
552 0a71aa17 Michael Hanselmann
                            help=("Bind address (default: %s)" %
553 0a71aa17 Michael Hanselmann
                                  default_bind_address),
554 0a71aa17 Michael Hanselmann
                            default=default_bind_address, metavar="ADDRESS")
555 04ccf5e9 Guido Trotter
556 0648750e Michael Hanselmann
  if default_ssl_key is not None and default_ssl_cert is not None:
557 3b1b0cb6 Guido Trotter
    optionparser.add_option("--no-ssl", dest="ssl",
558 3b1b0cb6 Guido Trotter
                            help="Do not secure HTTP protocol with SSL",
559 3b1b0cb6 Guido Trotter
                            default=True, action="store_false")
560 3b1b0cb6 Guido Trotter
    optionparser.add_option("-K", "--ssl-key", dest="ssl_key",
561 0648750e Michael Hanselmann
                            help=("SSL key path (default: %s)" %
562 0648750e Michael Hanselmann
                                  default_ssl_key),
563 0648750e Michael Hanselmann
                            default=default_ssl_key, type="string",
564 0648750e Michael Hanselmann
                            metavar="SSL_KEY_PATH")
565 3b1b0cb6 Guido Trotter
    optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert",
566 0648750e Michael Hanselmann
                            help=("SSL certificate path (default: %s)" %
567 0648750e Michael Hanselmann
                                  default_ssl_cert),
568 0648750e Michael Hanselmann
                            default=default_ssl_cert, type="string",
569 0648750e Michael Hanselmann
                            metavar="SSL_CERT_PATH")
570 3b1b0cb6 Guido Trotter
571 30dabd03 Michael Hanselmann
  # Disable the use of fork(2) if the daemon uses threads
572 30dabd03 Michael Hanselmann
  utils.no_fork = multithreaded
573 04ccf5e9 Guido Trotter
574 04ccf5e9 Guido Trotter
  options, args = optionparser.parse_args()
575 04ccf5e9 Guido Trotter
576 0648750e Michael Hanselmann
  if getattr(options, "ssl", False):
577 0648750e Michael Hanselmann
    ssl_paths = {
578 0648750e Michael Hanselmann
      "certificate": options.ssl_cert,
579 0648750e Michael Hanselmann
      "key": options.ssl_key,
580 0648750e Michael Hanselmann
      }
581 0648750e Michael Hanselmann
582 0648750e Michael Hanselmann
    for name, path in ssl_paths.iteritems():
583 0648750e Michael Hanselmann
      if not os.path.isfile(path):
584 0648750e Michael Hanselmann
        print >> sys.stderr, "SSL %s file '%s' was not found" % (name, path)
585 3b1b0cb6 Guido Trotter
        sys.exit(constants.EXIT_FAILURE)
586 3b1b0cb6 Guido Trotter
587 0648750e Michael Hanselmann
    # TODO: By initiating http.HttpSslParams here we would only read the files
588 0648750e Michael Hanselmann
    # once and have a proper validation (isfile returns False on directories)
589 0648750e Michael Hanselmann
    # at the same time.
590 0648750e Michael Hanselmann
591 3b1b0cb6 Guido Trotter
  if check_fn is not None:
592 3b1b0cb6 Guido Trotter
    check_fn(options, args)
593 3b1b0cb6 Guido Trotter
594 04ccf5e9 Guido Trotter
  utils.EnsureDirs(dirs)
595 04ccf5e9 Guido Trotter
596 04ccf5e9 Guido Trotter
  if options.fork:
597 743b53d4 René Nussbaumer
    try:
598 743b53d4 René Nussbaumer
      uid = pwd.getpwnam(user).pw_uid
599 743b53d4 René Nussbaumer
      gid = grp.getgrnam(group).gr_gid
600 743b53d4 René Nussbaumer
    except KeyError:
601 743b53d4 René Nussbaumer
      raise errors.ConfigurationError("User or group not existing on system:"
602 743b53d4 René Nussbaumer
                                      " %s:%s" % (user, group))
603 04ccf5e9 Guido Trotter
    utils.CloseFDs()
604 743b53d4 René Nussbaumer
    utils.Daemonize(constants.DAEMONS_LOGFILES[daemon_name], uid, gid)
605 04ccf5e9 Guido Trotter
606 04ccf5e9 Guido Trotter
  utils.WritePidFile(daemon_name)
607 04ccf5e9 Guido Trotter
  try:
608 04ccf5e9 Guido Trotter
    utils.SetupLogging(logfile=constants.DAEMONS_LOGFILES[daemon_name],
609 04ccf5e9 Guido Trotter
                       debug=options.debug,
610 04ccf5e9 Guido Trotter
                       stderr_logging=not options.fork,
611 e7307f08 Michael Hanselmann
                       multithreaded=multithreaded,
612 551b6283 Iustin Pop
                       program=daemon_name,
613 ff917534 Luca Bigliardi
                       syslog=options.syslog,
614 ff917534 Luca Bigliardi
                       console_logging=console_logging)
615 099c52ad Iustin Pop
    logging.info("%s daemon startup", daemon_name)
616 04ccf5e9 Guido Trotter
    exec_fn(options, args)
617 04ccf5e9 Guido Trotter
  finally:
618 04ccf5e9 Guido Trotter
    utils.RemovePidFile(daemon_name)