Statistics
| Branch: | Tag: | Revision:

root / lib / daemon.py @ 4008c8ed

History | View | Annotate | Download (21.2 kB)

1 821d9e43 Michael Hanselmann
#
2 821d9e43 Michael Hanselmann
#
3 821d9e43 Michael Hanselmann
4 821d9e43 Michael Hanselmann
# Copyright (C) 2006, 2007, 2008 Google Inc.
5 821d9e43 Michael Hanselmann
#
6 821d9e43 Michael Hanselmann
# This program is free software; you can redistribute it and/or modify
7 821d9e43 Michael Hanselmann
# it under the terms of the GNU General Public License as published by
8 821d9e43 Michael Hanselmann
# the Free Software Foundation; either version 2 of the License, or
9 821d9e43 Michael Hanselmann
# (at your option) any later version.
10 821d9e43 Michael Hanselmann
#
11 821d9e43 Michael Hanselmann
# This program is distributed in the hope that it will be useful, but
12 821d9e43 Michael Hanselmann
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 821d9e43 Michael Hanselmann
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 821d9e43 Michael Hanselmann
# General Public License for more details.
15 821d9e43 Michael Hanselmann
#
16 821d9e43 Michael Hanselmann
# You should have received a copy of the GNU General Public License
17 821d9e43 Michael Hanselmann
# along with this program; if not, write to the Free Software
18 821d9e43 Michael Hanselmann
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 821d9e43 Michael Hanselmann
# 02110-1301, USA.
20 821d9e43 Michael Hanselmann
21 821d9e43 Michael Hanselmann
22 821d9e43 Michael Hanselmann
"""Module with helper classes and functions for daemons"""
23 821d9e43 Michael Hanselmann
24 821d9e43 Michael Hanselmann
25 112d240d Guido Trotter
import asyncore
26 b66ab629 Guido Trotter
import asynchat
27 1e063ccd Guido Trotter
import collections
28 743b53d4 René Nussbaumer
import grp
29 3b1b0cb6 Guido Trotter
import os
30 743b53d4 René Nussbaumer
import pwd
31 821d9e43 Michael Hanselmann
import signal
32 04ccf5e9 Guido Trotter
import logging
33 a02b89cf Guido Trotter
import sched
34 a02b89cf Guido Trotter
import time
35 5f3269fc Guido Trotter
import socket
36 6ddf5c8f Guido Trotter
import select
37 c124045f Iustin Pop
import sys
38 821d9e43 Michael Hanselmann
39 821d9e43 Michael Hanselmann
from ganeti import utils
40 04ccf5e9 Guido Trotter
from ganeti import constants
41 a02b89cf Guido Trotter
from ganeti import errors
42 a744b676 Manuel Franceschini
from ganeti import netutils
43 a02b89cf Guido Trotter
44 a02b89cf Guido Trotter
45 743b53d4 René Nussbaumer
_DEFAULT_RUN_USER = "root"
46 743b53d4 René Nussbaumer
_DEFAULT_RUN_GROUP = "root"
47 743b53d4 René Nussbaumer
48 743b53d4 René Nussbaumer
49 a02b89cf Guido Trotter
class SchedulerBreakout(Exception):
50 a02b89cf Guido Trotter
  """Exception used to get out of the scheduler loop
51 a02b89cf Guido Trotter

52 a02b89cf Guido Trotter
  """
53 a02b89cf Guido Trotter
54 a02b89cf Guido Trotter
55 a02b89cf Guido Trotter
def AsyncoreDelayFunction(timeout):
56 a02b89cf Guido Trotter
  """Asyncore-compatible scheduler delay function.
57 a02b89cf Guido Trotter

58 a02b89cf Guido Trotter
  This is a delay function for sched that, rather than actually sleeping,
59 a02b89cf Guido Trotter
  executes asyncore events happening in the meantime.
60 a02b89cf Guido Trotter

61 a02b89cf Guido Trotter
  After an event has occurred, rather than returning, it raises a
62 a02b89cf Guido Trotter
  SchedulerBreakout exception, which will force the current scheduler.run()
63 a02b89cf Guido Trotter
  invocation to terminate, so that we can also check for signals. The main loop
64 a02b89cf Guido Trotter
  will then call the scheduler run again, which will allow it to actually
65 a02b89cf Guido Trotter
  process any due events.
66 a02b89cf Guido Trotter

67 a02b89cf Guido Trotter
  This is needed because scheduler.run() doesn't support a count=..., as
68 a02b89cf Guido Trotter
  asyncore loop, and the scheduler module documents throwing exceptions from
69 a02b89cf Guido Trotter
  inside the delay function as an allowed usage model.
70 a02b89cf Guido Trotter

71 a02b89cf Guido Trotter
  """
72 a02b89cf Guido Trotter
  asyncore.loop(timeout=timeout, count=1, use_poll=True)
73 a02b89cf Guido Trotter
  raise SchedulerBreakout()
74 a02b89cf Guido Trotter
75 a02b89cf Guido Trotter
76 a02b89cf Guido Trotter
class AsyncoreScheduler(sched.scheduler):
77 a02b89cf Guido Trotter
  """Event scheduler integrated with asyncore
78 a02b89cf Guido Trotter

79 a02b89cf Guido Trotter
  """
80 a02b89cf Guido Trotter
  def __init__(self, timefunc):
81 a02b89cf Guido Trotter
    sched.scheduler.__init__(self, timefunc, AsyncoreDelayFunction)
82 821d9e43 Michael Hanselmann
83 821d9e43 Michael Hanselmann
84 b11780bb Guido Trotter
class GanetiBaseAsyncoreDispatcher(asyncore.dispatcher):
85 b11780bb Guido Trotter
  """Base Ganeti Asyncore Dispacher
86 b11780bb Guido Trotter

87 b11780bb Guido Trotter
  """
88 b11780bb Guido Trotter
  # this method is overriding an asyncore.dispatcher method
89 b11780bb Guido Trotter
  def handle_error(self):
90 b11780bb Guido Trotter
    """Log an error in handling any request, and proceed.
91 b11780bb Guido Trotter

92 b11780bb Guido Trotter
    """
93 b11780bb Guido Trotter
    logging.exception("Error while handling asyncore request")
94 b11780bb Guido Trotter
95 b11780bb Guido Trotter
  # this method is overriding an asyncore.dispatcher method
96 b11780bb Guido Trotter
  def writable(self):
97 b11780bb Guido Trotter
    """Most of the time we don't want to check for writability.
98 b11780bb Guido Trotter

99 b11780bb Guido Trotter
    """
100 b11780bb Guido Trotter
    return False
101 b11780bb Guido Trotter
102 b11780bb Guido Trotter
103 a4b605ae Guido Trotter
def FormatAddress(family, address):
104 a4b605ae Guido Trotter
  """Format a client's address
105 a4b605ae Guido Trotter

106 a4b605ae Guido Trotter
  @type family: integer
107 a4b605ae Guido Trotter
  @param family: socket family (one of socket.AF_*)
108 a4b605ae Guido Trotter
  @type address: family specific (usually tuple)
109 a4b605ae Guido Trotter
  @param address: address, as reported by this class
110 a4b605ae Guido Trotter

111 a4b605ae Guido Trotter
  """
112 a4b605ae Guido Trotter
  if family == socket.AF_INET and len(address) == 2:
113 a4b605ae Guido Trotter
    return "%s:%d" % address
114 a4b605ae Guido Trotter
  elif family == socket.AF_UNIX and len(address) == 3:
115 a4b605ae Guido Trotter
    return "pid=%s, uid=%s, gid=%s" % address
116 a4b605ae Guido Trotter
  else:
117 a4b605ae Guido Trotter
    return str(address)
118 a4b605ae Guido Trotter
119 a4b605ae Guido Trotter
120 a4b605ae Guido Trotter
class AsyncStreamServer(GanetiBaseAsyncoreDispatcher):
121 a4b605ae Guido Trotter
  """A stream server to use with asyncore.
122 a4b605ae Guido Trotter

123 a4b605ae Guido Trotter
  Each request is accepted, and then dispatched to a separate asyncore
124 a4b605ae Guido Trotter
  dispatcher to handle.
125 a4b605ae Guido Trotter

126 a4b605ae Guido Trotter
  """
127 a4b605ae Guido Trotter
128 a4b605ae Guido Trotter
  _REQUEST_QUEUE_SIZE = 5
129 a4b605ae Guido Trotter
130 a4b605ae Guido Trotter
  def __init__(self, family, address):
131 a4b605ae Guido Trotter
    """Constructor for AsyncUnixStreamSocket
132 a4b605ae Guido Trotter

133 a4b605ae Guido Trotter
    @type family: integer
134 a4b605ae Guido Trotter
    @param family: socket family (one of socket.AF_*)
135 a4b605ae Guido Trotter
    @type address: address family dependent
136 a4b605ae Guido Trotter
    @param address: address to bind the socket to
137 a4b605ae Guido Trotter

138 a4b605ae Guido Trotter
    """
139 a4b605ae Guido Trotter
    GanetiBaseAsyncoreDispatcher.__init__(self)
140 a4b605ae Guido Trotter
    self.family = family
141 a4b605ae Guido Trotter
    self.create_socket(self.family, socket.SOCK_STREAM)
142 a4b605ae Guido Trotter
    self.set_reuse_addr()
143 a4b605ae Guido Trotter
    self.bind(address)
144 a4b605ae Guido Trotter
    self.listen(self._REQUEST_QUEUE_SIZE)
145 a4b605ae Guido Trotter
146 a4b605ae Guido Trotter
  # this method is overriding an asyncore.dispatcher method
147 a4b605ae Guido Trotter
  def handle_accept(self):
148 a4b605ae Guido Trotter
    """Accept a new client connection.
149 a4b605ae Guido Trotter

150 a4b605ae Guido Trotter
    Creates a new instance of the handler class, which will use asyncore to
151 a4b605ae Guido Trotter
    serve the client.
152 a4b605ae Guido Trotter

153 a4b605ae Guido Trotter
    """
154 a4b605ae Guido Trotter
    accept_result = utils.IgnoreSignals(self.accept)
155 a4b605ae Guido Trotter
    if accept_result is not None:
156 a4b605ae Guido Trotter
      connected_socket, client_address = accept_result
157 a4b605ae Guido Trotter
      if self.family == socket.AF_UNIX:
158 a4b605ae Guido Trotter
        # override the client address, as for unix sockets nothing meaningful
159 a4b605ae Guido Trotter
        # is passed in from accept anyway
160 a744b676 Manuel Franceschini
        client_address = netutils.GetSocketCredentials(connected_socket)
161 a4b605ae Guido Trotter
      logging.info("Accepted connection from %s",
162 a4b605ae Guido Trotter
                   FormatAddress(self.family, client_address))
163 a4b605ae Guido Trotter
      self.handle_connection(connected_socket, client_address)
164 a4b605ae Guido Trotter
165 a4b605ae Guido Trotter
  def handle_connection(self, connected_socket, client_address):
166 a4b605ae Guido Trotter
    """Handle an already accepted connection.
167 a4b605ae Guido Trotter

168 a4b605ae Guido Trotter
    """
169 a4b605ae Guido Trotter
    raise NotImplementedError
170 a4b605ae Guido Trotter
171 a4b605ae Guido Trotter
172 b66ab629 Guido Trotter
class AsyncTerminatedMessageStream(asynchat.async_chat):
173 b66ab629 Guido Trotter
  """A terminator separated message stream asyncore module.
174 b66ab629 Guido Trotter

175 b66ab629 Guido Trotter
  Handles a stream connection receiving messages terminated by a defined
176 b66ab629 Guido Trotter
  separator. For each complete message handle_message is called.
177 b66ab629 Guido Trotter

178 b66ab629 Guido Trotter
  """
179 37e62cb9 Guido Trotter
  def __init__(self, connected_socket, peer_address, terminator, family,
180 37e62cb9 Guido Trotter
               unhandled_limit):
181 b66ab629 Guido Trotter
    """AsyncTerminatedMessageStream constructor.
182 b66ab629 Guido Trotter

183 b66ab629 Guido Trotter
    @type connected_socket: socket.socket
184 b66ab629 Guido Trotter
    @param connected_socket: connected stream socket to receive messages from
185 b66ab629 Guido Trotter
    @param peer_address: family-specific peer address
186 b66ab629 Guido Trotter
    @type terminator: string
187 b66ab629 Guido Trotter
    @param terminator: terminator separating messages in the stream
188 b66ab629 Guido Trotter
    @type family: integer
189 b66ab629 Guido Trotter
    @param family: socket family
190 37e62cb9 Guido Trotter
    @type unhandled_limit: integer or None
191 37e62cb9 Guido Trotter
    @param unhandled_limit: maximum unanswered messages
192 b66ab629 Guido Trotter

193 b66ab629 Guido Trotter
    """
194 b66ab629 Guido Trotter
    # python 2.4/2.5 uses conn=... while 2.6 has sock=... we have to cheat by
195 b66ab629 Guido Trotter
    # using a positional argument rather than a keyword one.
196 b66ab629 Guido Trotter
    asynchat.async_chat.__init__(self, connected_socket)
197 b66ab629 Guido Trotter
    self.connected_socket = connected_socket
198 b66ab629 Guido Trotter
    # on python 2.4 there is no "family" attribute for the socket class
199 b66ab629 Guido Trotter
    # FIXME: when we move to python 2.5 or above remove the family parameter
200 b66ab629 Guido Trotter
    #self.family = self.connected_socket.family
201 b66ab629 Guido Trotter
    self.family = family
202 b66ab629 Guido Trotter
    self.peer_address = peer_address
203 b66ab629 Guido Trotter
    self.terminator = terminator
204 37e62cb9 Guido Trotter
    self.unhandled_limit = unhandled_limit
205 b66ab629 Guido Trotter
    self.set_terminator(terminator)
206 b66ab629 Guido Trotter
    self.ibuffer = []
207 37e62cb9 Guido Trotter
    self.receive_count = 0
208 37e62cb9 Guido Trotter
    self.send_count = 0
209 1e063ccd Guido Trotter
    self.oqueue = collections.deque()
210 37e62cb9 Guido Trotter
    self.iqueue = collections.deque()
211 b66ab629 Guido Trotter
212 b66ab629 Guido Trotter
  # this method is overriding an asynchat.async_chat method
213 b66ab629 Guido Trotter
  def collect_incoming_data(self, data):
214 b66ab629 Guido Trotter
    self.ibuffer.append(data)
215 b66ab629 Guido Trotter
216 37e62cb9 Guido Trotter
  def _can_handle_message(self):
217 37e62cb9 Guido Trotter
    return (self.unhandled_limit is None or
218 37e62cb9 Guido Trotter
            (self.receive_count < self.send_count + self.unhandled_limit) and
219 37e62cb9 Guido Trotter
             not self.iqueue)
220 37e62cb9 Guido Trotter
221 b66ab629 Guido Trotter
  # this method is overriding an asynchat.async_chat method
222 b66ab629 Guido Trotter
  def found_terminator(self):
223 b66ab629 Guido Trotter
    message = "".join(self.ibuffer)
224 b66ab629 Guido Trotter
    self.ibuffer = []
225 37e62cb9 Guido Trotter
    message_id = self.receive_count
226 37e62cb9 Guido Trotter
    # We need to increase the receive_count after checking if the message can
227 37e62cb9 Guido Trotter
    # be handled, but before calling handle_message
228 37e62cb9 Guido Trotter
    can_handle = self._can_handle_message()
229 37e62cb9 Guido Trotter
    self.receive_count += 1
230 37e62cb9 Guido Trotter
    if can_handle:
231 37e62cb9 Guido Trotter
      self.handle_message(message, message_id)
232 37e62cb9 Guido Trotter
    else:
233 37e62cb9 Guido Trotter
      self.iqueue.append((message, message_id))
234 b66ab629 Guido Trotter
235 b66ab629 Guido Trotter
  def handle_message(self, message, message_id):
236 b66ab629 Guido Trotter
    """Handle a terminated message.
237 b66ab629 Guido Trotter

238 b66ab629 Guido Trotter
    @type message: string
239 b66ab629 Guido Trotter
    @param message: message to handle
240 b66ab629 Guido Trotter
    @type message_id: integer
241 b66ab629 Guido Trotter
    @param message_id: stream's message sequence number
242 b66ab629 Guido Trotter

243 b66ab629 Guido Trotter
    """
244 b66ab629 Guido Trotter
    pass
245 b66ab629 Guido Trotter
    # TODO: move this method to raise NotImplementedError
246 b66ab629 Guido Trotter
    # raise NotImplementedError
247 b66ab629 Guido Trotter
248 1e063ccd Guido Trotter
  def send_message(self, message):
249 1e063ccd Guido Trotter
    """Send a message to the remote peer. This function is thread-safe.
250 1e063ccd Guido Trotter

251 1e063ccd Guido Trotter
    @type message: string
252 1e063ccd Guido Trotter
    @param message: message to send, without the terminator
253 1e063ccd Guido Trotter

254 1e063ccd Guido Trotter
    @warning: If calling this function from a thread different than the one
255 1e063ccd Guido Trotter
    performing the main asyncore loop, remember that you have to wake that one
256 1e063ccd Guido Trotter
    up.
257 1e063ccd Guido Trotter

258 1e063ccd Guido Trotter
    """
259 1e063ccd Guido Trotter
    # If we just append the message we received to the output queue, this
260 1e063ccd Guido Trotter
    # function can be safely called by multiple threads at the same time, and
261 37e62cb9 Guido Trotter
    # we don't need locking, since deques are thread safe. handle_write in the
262 37e62cb9 Guido Trotter
    # asyncore thread will handle the next input message if there are any
263 37e62cb9 Guido Trotter
    # enqueued.
264 1e063ccd Guido Trotter
    self.oqueue.append(message)
265 1e063ccd Guido Trotter
266 1e063ccd Guido Trotter
  # this method is overriding an asyncore.dispatcher method
267 37e62cb9 Guido Trotter
  def readable(self):
268 37e62cb9 Guido Trotter
    # read from the socket if we can handle the next requests
269 37e62cb9 Guido Trotter
    return self._can_handle_message() and asynchat.async_chat.readable(self)
270 37e62cb9 Guido Trotter
271 37e62cb9 Guido Trotter
  # this method is overriding an asyncore.dispatcher method
272 1e063ccd Guido Trotter
  def writable(self):
273 1e063ccd Guido Trotter
    # the output queue may become full just after we called writable. This only
274 1e063ccd Guido Trotter
    # works if we know we'll have something else waking us up from the select,
275 1e063ccd Guido Trotter
    # in such case, anyway.
276 1e063ccd Guido Trotter
    return asynchat.async_chat.writable(self) or self.oqueue
277 1e063ccd Guido Trotter
278 1e063ccd Guido Trotter
  # this method is overriding an asyncore.dispatcher method
279 1e063ccd Guido Trotter
  def handle_write(self):
280 1e063ccd Guido Trotter
    if self.oqueue:
281 37e62cb9 Guido Trotter
      # if we have data in the output queue, then send_message was called.
282 37e62cb9 Guido Trotter
      # this means we can process one more message from the input queue, if
283 37e62cb9 Guido Trotter
      # there are any.
284 1e063ccd Guido Trotter
      data = self.oqueue.popleft()
285 1e063ccd Guido Trotter
      self.push(data + self.terminator)
286 37e62cb9 Guido Trotter
      self.send_count += 1
287 37e62cb9 Guido Trotter
      if self.iqueue:
288 37e62cb9 Guido Trotter
        self.handle_message(*self.iqueue.popleft())
289 1e063ccd Guido Trotter
    self.initiate_send()
290 1e063ccd Guido Trotter
291 b66ab629 Guido Trotter
  def close_log(self):
292 b66ab629 Guido Trotter
    logging.info("Closing connection from %s",
293 b66ab629 Guido Trotter
                 FormatAddress(self.family, self.peer_address))
294 b66ab629 Guido Trotter
    self.close()
295 b66ab629 Guido Trotter
296 b66ab629 Guido Trotter
  # this method is overriding an asyncore.dispatcher method
297 b66ab629 Guido Trotter
  def handle_expt(self):
298 b66ab629 Guido Trotter
    self.close_log()
299 b66ab629 Guido Trotter
300 b66ab629 Guido Trotter
  # this method is overriding an asyncore.dispatcher method
301 b66ab629 Guido Trotter
  def handle_error(self):
302 b66ab629 Guido Trotter
    """Log an error in handling any request, and proceed.
303 b66ab629 Guido Trotter

304 b66ab629 Guido Trotter
    """
305 b66ab629 Guido Trotter
    logging.exception("Error while handling asyncore request")
306 b66ab629 Guido Trotter
    self.close_log()
307 b66ab629 Guido Trotter
308 b66ab629 Guido Trotter
309 b11780bb Guido Trotter
class AsyncUDPSocket(GanetiBaseAsyncoreDispatcher):
310 5f3269fc Guido Trotter
  """An improved asyncore udp socket.
311 5f3269fc Guido Trotter

312 5f3269fc Guido Trotter
  """
313 d8bcfe21 Manuel Franceschini
  def __init__(self, family):
314 5f3269fc Guido Trotter
    """Constructor for AsyncUDPSocket
315 5f3269fc Guido Trotter

316 5f3269fc Guido Trotter
    """
317 b11780bb Guido Trotter
    GanetiBaseAsyncoreDispatcher.__init__(self)
318 5f3269fc Guido Trotter
    self._out_queue = []
319 d8bcfe21 Manuel Franceschini
    self._family = family
320 d8bcfe21 Manuel Franceschini
    self.create_socket(family, socket.SOCK_DGRAM)
321 5f3269fc Guido Trotter
322 5f3269fc Guido Trotter
  # this method is overriding an asyncore.dispatcher method
323 5f3269fc Guido Trotter
  def handle_connect(self):
324 5f3269fc Guido Trotter
    # Python thinks that the first udp message from a source qualifies as a
325 5f3269fc Guido Trotter
    # "connect" and further ones are part of the same connection. We beg to
326 5f3269fc Guido Trotter
    # differ and treat all messages equally.
327 5f3269fc Guido Trotter
    pass
328 5f3269fc Guido Trotter
329 5f3269fc Guido Trotter
  # this method is overriding an asyncore.dispatcher method
330 5f3269fc Guido Trotter
  def handle_read(self):
331 6e7e58b4 Guido Trotter
    recv_result = utils.IgnoreSignals(self.recvfrom,
332 6e7e58b4 Guido Trotter
                                      constants.MAX_UDP_DATA_SIZE)
333 6e7e58b4 Guido Trotter
    if recv_result is not None:
334 6e7e58b4 Guido Trotter
      payload, address = recv_result
335 d8bcfe21 Manuel Franceschini
      if self._family == socket.AF_INET6:
336 d8bcfe21 Manuel Franceschini
        # we ignore 'flow info' and 'scope id' as we don't need them
337 d8bcfe21 Manuel Franceschini
        ip, port, _, _ = address
338 d8bcfe21 Manuel Franceschini
      else:
339 d8bcfe21 Manuel Franceschini
        ip, port = address
340 d8bcfe21 Manuel Franceschini
341 6e7e58b4 Guido Trotter
      self.handle_datagram(payload, ip, port)
342 5f3269fc Guido Trotter
343 5f3269fc Guido Trotter
  def handle_datagram(self, payload, ip, port):
344 5f3269fc Guido Trotter
    """Handle an already read udp datagram
345 5f3269fc Guido Trotter

346 5f3269fc Guido Trotter
    """
347 5f3269fc Guido Trotter
    raise NotImplementedError
348 5f3269fc Guido Trotter
349 5f3269fc Guido Trotter
  # this method is overriding an asyncore.dispatcher method
350 5f3269fc Guido Trotter
  def writable(self):
351 5f3269fc Guido Trotter
    # We should check whether we can write to the socket only if we have
352 5f3269fc Guido Trotter
    # something scheduled to be written
353 5f3269fc Guido Trotter
    return bool(self._out_queue)
354 5f3269fc Guido Trotter
355 48bf6352 Guido Trotter
  # this method is overriding an asyncore.dispatcher method
356 5f3269fc Guido Trotter
  def handle_write(self):
357 3660fcf5 Guido Trotter
    if not self._out_queue:
358 3660fcf5 Guido Trotter
      logging.error("handle_write called with empty output queue")
359 3660fcf5 Guido Trotter
      return
360 3660fcf5 Guido Trotter
    (ip, port, payload) = self._out_queue[0]
361 232144d0 Guido Trotter
    utils.IgnoreSignals(self.sendto, payload, 0, (ip, port))
362 3660fcf5 Guido Trotter
    self._out_queue.pop(0)
363 3660fcf5 Guido Trotter
364 5f3269fc Guido Trotter
  def enqueue_send(self, ip, port, payload):
365 5f3269fc Guido Trotter
    """Enqueue a datagram to be sent when possible
366 5f3269fc Guido Trotter

367 5f3269fc Guido Trotter
    """
368 c8eded0b Guido Trotter
    if len(payload) > constants.MAX_UDP_DATA_SIZE:
369 c8eded0b Guido Trotter
      raise errors.UdpDataSizeError('Packet too big: %s > %s' % (len(payload),
370 c8eded0b Guido Trotter
                                    constants.MAX_UDP_DATA_SIZE))
371 5f3269fc Guido Trotter
    self._out_queue.append((ip, port, payload))
372 5f3269fc Guido Trotter
373 6ddf5c8f Guido Trotter
  def process_next_packet(self, timeout=0):
374 6ddf5c8f Guido Trotter
    """Process the next datagram, waiting for it if necessary.
375 6ddf5c8f Guido Trotter

376 6ddf5c8f Guido Trotter
    @type timeout: float
377 6ddf5c8f Guido Trotter
    @param timeout: how long to wait for data
378 6ddf5c8f Guido Trotter
    @rtype: boolean
379 6ddf5c8f Guido Trotter
    @return: True if some data has been handled, False otherwise
380 6ddf5c8f Guido Trotter

381 6ddf5c8f Guido Trotter
    """
382 1b429e2a Iustin Pop
    result = utils.WaitForFdCondition(self, select.POLLIN, timeout)
383 1b429e2a Iustin Pop
    if result is not None and result & select.POLLIN:
384 3660fcf5 Guido Trotter
      self.handle_read()
385 6ddf5c8f Guido Trotter
      return True
386 6ddf5c8f Guido Trotter
    else:
387 6ddf5c8f Guido Trotter
      return False
388 6ddf5c8f Guido Trotter
389 5f3269fc Guido Trotter
390 495ba852 Guido Trotter
class AsyncAwaker(GanetiBaseAsyncoreDispatcher):
391 495ba852 Guido Trotter
  """A way to notify the asyncore loop that something is going on.
392 495ba852 Guido Trotter

393 495ba852 Guido Trotter
  If an asyncore daemon is multithreaded when a thread tries to push some data
394 495ba852 Guido Trotter
  to a socket, the main loop handling asynchronous requests might be sleeping
395 495ba852 Guido Trotter
  waiting on a select(). To avoid this it can create an instance of the
396 495ba852 Guido Trotter
  AsyncAwaker, which other threads can use to wake it up.
397 495ba852 Guido Trotter

398 495ba852 Guido Trotter
  """
399 495ba852 Guido Trotter
  def __init__(self, signal_fn=None):
400 495ba852 Guido Trotter
    """Constructor for AsyncAwaker
401 495ba852 Guido Trotter

402 495ba852 Guido Trotter
    @type signal_fn: function
403 495ba852 Guido Trotter
    @param signal_fn: function to call when awaken
404 495ba852 Guido Trotter

405 495ba852 Guido Trotter
    """
406 495ba852 Guido Trotter
    GanetiBaseAsyncoreDispatcher.__init__(self)
407 495ba852 Guido Trotter
    assert signal_fn == None or callable(signal_fn)
408 495ba852 Guido Trotter
    (self.in_socket, self.out_socket) = socket.socketpair(socket.AF_UNIX,
409 495ba852 Guido Trotter
                                                          socket.SOCK_STREAM)
410 495ba852 Guido Trotter
    self.in_socket.setblocking(0)
411 b628191f Guido Trotter
    self.in_socket.shutdown(socket.SHUT_WR)
412 b628191f Guido Trotter
    self.out_socket.shutdown(socket.SHUT_RD)
413 495ba852 Guido Trotter
    self.set_socket(self.in_socket)
414 495ba852 Guido Trotter
    self.need_signal = True
415 495ba852 Guido Trotter
    self.signal_fn = signal_fn
416 495ba852 Guido Trotter
    self.connected = True
417 495ba852 Guido Trotter
418 495ba852 Guido Trotter
  # this method is overriding an asyncore.dispatcher method
419 495ba852 Guido Trotter
  def handle_read(self):
420 495ba852 Guido Trotter
    utils.IgnoreSignals(self.recv, 4096)
421 495ba852 Guido Trotter
    if self.signal_fn:
422 495ba852 Guido Trotter
      self.signal_fn()
423 495ba852 Guido Trotter
    self.need_signal = True
424 495ba852 Guido Trotter
425 495ba852 Guido Trotter
  # this method is overriding an asyncore.dispatcher method
426 495ba852 Guido Trotter
  def close(self):
427 495ba852 Guido Trotter
    asyncore.dispatcher.close(self)
428 495ba852 Guido Trotter
    self.out_socket.close()
429 495ba852 Guido Trotter
430 495ba852 Guido Trotter
  def signal(self):
431 495ba852 Guido Trotter
    """Signal the asyncore main loop.
432 495ba852 Guido Trotter

433 495ba852 Guido Trotter
    Any data we send here will be ignored, but it will cause the select() call
434 495ba852 Guido Trotter
    to return.
435 495ba852 Guido Trotter

436 495ba852 Guido Trotter
    """
437 495ba852 Guido Trotter
    # Yes, there is a race condition here. No, we don't care, at worst we're
438 495ba852 Guido Trotter
    # sending more than one wakeup token, which doesn't harm at all.
439 495ba852 Guido Trotter
    if self.need_signal:
440 495ba852 Guido Trotter
      self.need_signal = False
441 495ba852 Guido Trotter
      self.out_socket.send("\0")
442 495ba852 Guido Trotter
443 495ba852 Guido Trotter
444 821d9e43 Michael Hanselmann
class Mainloop(object):
445 821d9e43 Michael Hanselmann
  """Generic mainloop for daemons
446 821d9e43 Michael Hanselmann

447 69b99987 Michael Hanselmann
  @ivar scheduler: A sched.scheduler object, which can be used to register
448 69b99987 Michael Hanselmann
    timed events
449 69b99987 Michael Hanselmann

450 821d9e43 Michael Hanselmann
  """
451 821d9e43 Michael Hanselmann
  def __init__(self):
452 b14b975f Michael Hanselmann
    """Constructs a new Mainloop instance.
453 b14b975f Michael Hanselmann

454 b14b975f Michael Hanselmann
    """
455 821d9e43 Michael Hanselmann
    self._signal_wait = []
456 a02b89cf Guido Trotter
    self.scheduler = AsyncoreScheduler(time.time)
457 821d9e43 Michael Hanselmann
458 9b739173 Guido Trotter
  @utils.SignalHandled([signal.SIGCHLD])
459 9b739173 Guido Trotter
  @utils.SignalHandled([signal.SIGTERM])
460 f59dce3e Guido Trotter
  @utils.SignalHandled([signal.SIGINT])
461 69b99987 Michael Hanselmann
  def Run(self, signal_handlers=None):
462 b14b975f Michael Hanselmann
    """Runs the mainloop.
463 b14b975f Michael Hanselmann

464 9b739173 Guido Trotter
    @type signal_handlers: dict
465 9b739173 Guido Trotter
    @param signal_handlers: signal->L{utils.SignalHandler} passed by decorator
466 b14b975f Michael Hanselmann

467 b14b975f Michael Hanselmann
    """
468 9b739173 Guido Trotter
    assert isinstance(signal_handlers, dict) and \
469 9b739173 Guido Trotter
           len(signal_handlers) > 0, \
470 9b739173 Guido Trotter
           "Broken SignalHandled decorator"
471 9b739173 Guido Trotter
    running = True
472 9b739173 Guido Trotter
    # Start actual main loop
473 9b739173 Guido Trotter
    while running:
474 a02b89cf Guido Trotter
      if not self.scheduler.empty():
475 a02b89cf Guido Trotter
        try:
476 a02b89cf Guido Trotter
          self.scheduler.run()
477 a02b89cf Guido Trotter
        except SchedulerBreakout:
478 a02b89cf Guido Trotter
          pass
479 a02b89cf Guido Trotter
      else:
480 a02b89cf Guido Trotter
        asyncore.loop(count=1, use_poll=True)
481 9b739173 Guido Trotter
482 9b739173 Guido Trotter
      # Check whether a signal was raised
483 9b739173 Guido Trotter
      for sig in signal_handlers:
484 9b739173 Guido Trotter
        handler = signal_handlers[sig]
485 9b739173 Guido Trotter
        if handler.called:
486 9b739173 Guido Trotter
          self._CallSignalWaiters(sig)
487 f59dce3e Guido Trotter
          running = sig not in (signal.SIGTERM, signal.SIGINT)
488 9b739173 Guido Trotter
          handler.Clear()
489 a570e2a8 Guido Trotter
490 b14b975f Michael Hanselmann
  def _CallSignalWaiters(self, signum):
491 b14b975f Michael Hanselmann
    """Calls all signal waiters for a certain signal.
492 b14b975f Michael Hanselmann

493 b14b975f Michael Hanselmann
    @type signum: int
494 b14b975f Michael Hanselmann
    @param signum: Signal number
495 b14b975f Michael Hanselmann

496 b14b975f Michael Hanselmann
    """
497 b14b975f Michael Hanselmann
    for owner in self._signal_wait:
498 a9fe7232 Guido Trotter
      owner.OnSignal(signum)
499 821d9e43 Michael Hanselmann
500 821d9e43 Michael Hanselmann
  def RegisterSignal(self, owner):
501 821d9e43 Michael Hanselmann
    """Registers a receiver for signal notifications
502 821d9e43 Michael Hanselmann

503 821d9e43 Michael Hanselmann
    The receiver must support a "OnSignal(self, signum)" function.
504 821d9e43 Michael Hanselmann

505 821d9e43 Michael Hanselmann
    @type owner: instance
506 821d9e43 Michael Hanselmann
    @param owner: Receiver
507 821d9e43 Michael Hanselmann

508 821d9e43 Michael Hanselmann
    """
509 821d9e43 Michael Hanselmann
    self._signal_wait.append(owner)
510 b11c9e5c Michael Hanselmann
511 04ccf5e9 Guido Trotter
512 30dabd03 Michael Hanselmann
def GenericMain(daemon_name, optionparser, dirs, check_fn, exec_fn,
513 1c54156d Luca Bigliardi
                multithreaded=False, console_logging=False,
514 743b53d4 René Nussbaumer
                default_ssl_cert=None, default_ssl_key=None,
515 743b53d4 René Nussbaumer
                user=_DEFAULT_RUN_USER, group=_DEFAULT_RUN_GROUP):
516 04ccf5e9 Guido Trotter
  """Shared main function for daemons.
517 04ccf5e9 Guido Trotter

518 04ccf5e9 Guido Trotter
  @type daemon_name: string
519 04ccf5e9 Guido Trotter
  @param daemon_name: daemon name
520 69b99987 Michael Hanselmann
  @type optionparser: optparse.OptionParser
521 04ccf5e9 Guido Trotter
  @param optionparser: initialized optionparser with daemon-specific options
522 04ccf5e9 Guido Trotter
                       (common -f -d options will be handled by this module)
523 5a062513 Guido Trotter
  @type dirs: list of (string, integer)
524 5a062513 Guido Trotter
  @param dirs: list of directories that must be created if they don't exist,
525 5a062513 Guido Trotter
               and the permissions to be used to create them
526 04ccf5e9 Guido Trotter
  @type check_fn: function which accepts (options, args)
527 04ccf5e9 Guido Trotter
  @param check_fn: function that checks start conditions and exits if they're
528 04ccf5e9 Guido Trotter
                   not met
529 04ccf5e9 Guido Trotter
  @type exec_fn: function which accepts (options, args)
530 04ccf5e9 Guido Trotter
  @param exec_fn: function that's executed with the daemon's pid file held, and
531 04ccf5e9 Guido Trotter
                  runs the daemon itself.
532 30dabd03 Michael Hanselmann
  @type multithreaded: bool
533 30dabd03 Michael Hanselmann
  @param multithreaded: Whether the daemon uses threads
534 ff917534 Luca Bigliardi
  @type console_logging: boolean
535 ff917534 Luca Bigliardi
  @param console_logging: if True, the daemon will fall back to the system
536 ff917534 Luca Bigliardi
                          console if logging fails
537 0648750e Michael Hanselmann
  @type default_ssl_cert: string
538 0648750e Michael Hanselmann
  @param default_ssl_cert: Default SSL certificate path
539 0648750e Michael Hanselmann
  @type default_ssl_key: string
540 0648750e Michael Hanselmann
  @param default_ssl_key: Default SSL key path
541 743b53d4 René Nussbaumer
  @param user: Default user to run as
542 743b53d4 René Nussbaumer
  @type user: string
543 743b53d4 René Nussbaumer
  @param group: Default group to run as
544 743b53d4 René Nussbaumer
  @type group: string
545 04ccf5e9 Guido Trotter

546 04ccf5e9 Guido Trotter
  """
547 04ccf5e9 Guido Trotter
  optionparser.add_option("-f", "--foreground", dest="fork",
548 04ccf5e9 Guido Trotter
                          help="Don't detach from the current terminal",
549 04ccf5e9 Guido Trotter
                          default=True, action="store_false")
550 04ccf5e9 Guido Trotter
  optionparser.add_option("-d", "--debug", dest="debug",
551 04ccf5e9 Guido Trotter
                          help="Enable some debug messages",
552 04ccf5e9 Guido Trotter
                          default=False, action="store_true")
553 551b6283 Iustin Pop
  optionparser.add_option("--syslog", dest="syslog",
554 551b6283 Iustin Pop
                          help="Enable logging to syslog (except debug"
555 551b6283 Iustin Pop
                          " messages); one of 'no', 'yes' or 'only' [%s]" %
556 551b6283 Iustin Pop
                          constants.SYSLOG_USAGE,
557 551b6283 Iustin Pop
                          default=constants.SYSLOG_USAGE,
558 551b6283 Iustin Pop
                          choices=["no", "yes", "only"])
559 0a71aa17 Michael Hanselmann
560 04ccf5e9 Guido Trotter
  if daemon_name in constants.DAEMONS_PORTS:
561 14f5f1b6 Manuel Franceschini
    default_bind_address = constants.IP4_ADDRESS_ANY
562 a744b676 Manuel Franceschini
    default_port = netutils.GetDaemonPort(daemon_name)
563 0a71aa17 Michael Hanselmann
564 0a71aa17 Michael Hanselmann
    # For networked daemons we allow choosing the port and bind address
565 04ccf5e9 Guido Trotter
    optionparser.add_option("-p", "--port", dest="port",
566 0a71aa17 Michael Hanselmann
                            help="Network port (default: %s)" % default_port,
567 0a71aa17 Michael Hanselmann
                            default=default_port, type="int")
568 04ccf5e9 Guido Trotter
    optionparser.add_option("-b", "--bind", dest="bind_address",
569 0a71aa17 Michael Hanselmann
                            help=("Bind address (default: %s)" %
570 0a71aa17 Michael Hanselmann
                                  default_bind_address),
571 0a71aa17 Michael Hanselmann
                            default=default_bind_address, metavar="ADDRESS")
572 04ccf5e9 Guido Trotter
573 0648750e Michael Hanselmann
  if default_ssl_key is not None and default_ssl_cert is not None:
574 3b1b0cb6 Guido Trotter
    optionparser.add_option("--no-ssl", dest="ssl",
575 3b1b0cb6 Guido Trotter
                            help="Do not secure HTTP protocol with SSL",
576 3b1b0cb6 Guido Trotter
                            default=True, action="store_false")
577 3b1b0cb6 Guido Trotter
    optionparser.add_option("-K", "--ssl-key", dest="ssl_key",
578 0648750e Michael Hanselmann
                            help=("SSL key path (default: %s)" %
579 0648750e Michael Hanselmann
                                  default_ssl_key),
580 0648750e Michael Hanselmann
                            default=default_ssl_key, type="string",
581 0648750e Michael Hanselmann
                            metavar="SSL_KEY_PATH")
582 3b1b0cb6 Guido Trotter
    optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert",
583 0648750e Michael Hanselmann
                            help=("SSL certificate path (default: %s)" %
584 0648750e Michael Hanselmann
                                  default_ssl_cert),
585 0648750e Michael Hanselmann
                            default=default_ssl_cert, type="string",
586 0648750e Michael Hanselmann
                            metavar="SSL_CERT_PATH")
587 3b1b0cb6 Guido Trotter
588 30dabd03 Michael Hanselmann
  # Disable the use of fork(2) if the daemon uses threads
589 30dabd03 Michael Hanselmann
  utils.no_fork = multithreaded
590 04ccf5e9 Guido Trotter
591 04ccf5e9 Guido Trotter
  options, args = optionparser.parse_args()
592 04ccf5e9 Guido Trotter
593 0648750e Michael Hanselmann
  if getattr(options, "ssl", False):
594 0648750e Michael Hanselmann
    ssl_paths = {
595 0648750e Michael Hanselmann
      "certificate": options.ssl_cert,
596 0648750e Michael Hanselmann
      "key": options.ssl_key,
597 0648750e Michael Hanselmann
      }
598 0648750e Michael Hanselmann
599 0648750e Michael Hanselmann
    for name, path in ssl_paths.iteritems():
600 0648750e Michael Hanselmann
      if not os.path.isfile(path):
601 0648750e Michael Hanselmann
        print >> sys.stderr, "SSL %s file '%s' was not found" % (name, path)
602 3b1b0cb6 Guido Trotter
        sys.exit(constants.EXIT_FAILURE)
603 3b1b0cb6 Guido Trotter
604 0648750e Michael Hanselmann
    # TODO: By initiating http.HttpSslParams here we would only read the files
605 0648750e Michael Hanselmann
    # once and have a proper validation (isfile returns False on directories)
606 0648750e Michael Hanselmann
    # at the same time.
607 0648750e Michael Hanselmann
608 3b1b0cb6 Guido Trotter
  if check_fn is not None:
609 3b1b0cb6 Guido Trotter
    check_fn(options, args)
610 3b1b0cb6 Guido Trotter
611 04ccf5e9 Guido Trotter
  utils.EnsureDirs(dirs)
612 04ccf5e9 Guido Trotter
613 04ccf5e9 Guido Trotter
  if options.fork:
614 743b53d4 René Nussbaumer
    try:
615 743b53d4 René Nussbaumer
      uid = pwd.getpwnam(user).pw_uid
616 743b53d4 René Nussbaumer
      gid = grp.getgrnam(group).gr_gid
617 743b53d4 René Nussbaumer
    except KeyError:
618 743b53d4 René Nussbaumer
      raise errors.ConfigurationError("User or group not existing on system:"
619 743b53d4 René Nussbaumer
                                      " %s:%s" % (user, group))
620 04ccf5e9 Guido Trotter
    utils.CloseFDs()
621 743b53d4 René Nussbaumer
    utils.Daemonize(constants.DAEMONS_LOGFILES[daemon_name], uid, gid)
622 04ccf5e9 Guido Trotter
623 04ccf5e9 Guido Trotter
  utils.WritePidFile(daemon_name)
624 04ccf5e9 Guido Trotter
  try:
625 04ccf5e9 Guido Trotter
    utils.SetupLogging(logfile=constants.DAEMONS_LOGFILES[daemon_name],
626 04ccf5e9 Guido Trotter
                       debug=options.debug,
627 04ccf5e9 Guido Trotter
                       stderr_logging=not options.fork,
628 e7307f08 Michael Hanselmann
                       multithreaded=multithreaded,
629 551b6283 Iustin Pop
                       program=daemon_name,
630 ff917534 Luca Bigliardi
                       syslog=options.syslog,
631 ff917534 Luca Bigliardi
                       console_logging=console_logging)
632 099c52ad Iustin Pop
    logging.info("%s daemon startup", daemon_name)
633 04ccf5e9 Guido Trotter
    exec_fn(options, args)
634 04ccf5e9 Guido Trotter
  finally:
635 04ccf5e9 Guido Trotter
    utils.RemovePidFile(daemon_name)