Statistics
| Branch: | Tag: | Revision:

root / lib / daemon.py @ e7323b5e

History | View | Annotate | Download (21.2 kB)

1 821d9e43 Michael Hanselmann
#
2 821d9e43 Michael Hanselmann
#
3 821d9e43 Michael Hanselmann
4 1a8337f2 Manuel Franceschini
# Copyright (C) 2006, 2007, 2008, 2010 Google Inc.
5 821d9e43 Michael Hanselmann
#
6 821d9e43 Michael Hanselmann
# This program is free software; you can redistribute it and/or modify
7 821d9e43 Michael Hanselmann
# it under the terms of the GNU General Public License as published by
8 821d9e43 Michael Hanselmann
# the Free Software Foundation; either version 2 of the License, or
9 821d9e43 Michael Hanselmann
# (at your option) any later version.
10 821d9e43 Michael Hanselmann
#
11 821d9e43 Michael Hanselmann
# This program is distributed in the hope that it will be useful, but
12 821d9e43 Michael Hanselmann
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 821d9e43 Michael Hanselmann
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 821d9e43 Michael Hanselmann
# General Public License for more details.
15 821d9e43 Michael Hanselmann
#
16 821d9e43 Michael Hanselmann
# You should have received a copy of the GNU General Public License
17 821d9e43 Michael Hanselmann
# along with this program; if not, write to the Free Software
18 821d9e43 Michael Hanselmann
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 821d9e43 Michael Hanselmann
# 02110-1301, USA.
20 821d9e43 Michael Hanselmann
21 821d9e43 Michael Hanselmann
22 821d9e43 Michael Hanselmann
"""Module with helper classes and functions for daemons"""
23 821d9e43 Michael Hanselmann
24 821d9e43 Michael Hanselmann
25 112d240d Guido Trotter
import asyncore
26 b66ab629 Guido Trotter
import asynchat
27 1e063ccd Guido Trotter
import collections
28 743b53d4 René Nussbaumer
import grp
29 3b1b0cb6 Guido Trotter
import os
30 743b53d4 René Nussbaumer
import pwd
31 821d9e43 Michael Hanselmann
import signal
32 04ccf5e9 Guido Trotter
import logging
33 a02b89cf Guido Trotter
import sched
34 a02b89cf Guido Trotter
import time
35 5f3269fc Guido Trotter
import socket
36 6ddf5c8f Guido Trotter
import select
37 c124045f Iustin Pop
import sys
38 821d9e43 Michael Hanselmann
39 821d9e43 Michael Hanselmann
from ganeti import utils
40 04ccf5e9 Guido Trotter
from ganeti import constants
41 a02b89cf Guido Trotter
from ganeti import errors
42 a744b676 Manuel Franceschini
from ganeti import netutils
43 e7323b5e Manuel Franceschini
from ganeti import ssconf
44 a02b89cf Guido Trotter
45 a02b89cf Guido Trotter
46 743b53d4 René Nussbaumer
_DEFAULT_RUN_USER = "root"
47 743b53d4 René Nussbaumer
_DEFAULT_RUN_GROUP = "root"
48 743b53d4 René Nussbaumer
49 743b53d4 René Nussbaumer
50 a02b89cf Guido Trotter
class SchedulerBreakout(Exception):
51 a02b89cf Guido Trotter
  """Exception used to get out of the scheduler loop
52 a02b89cf Guido Trotter

53 a02b89cf Guido Trotter
  """
54 a02b89cf Guido Trotter
55 a02b89cf Guido Trotter
56 a02b89cf Guido Trotter
def AsyncoreDelayFunction(timeout):
57 a02b89cf Guido Trotter
  """Asyncore-compatible scheduler delay function.
58 a02b89cf Guido Trotter

59 a02b89cf Guido Trotter
  This is a delay function for sched that, rather than actually sleeping,
60 a02b89cf Guido Trotter
  executes asyncore events happening in the meantime.
61 a02b89cf Guido Trotter

62 a02b89cf Guido Trotter
  After an event has occurred, rather than returning, it raises a
63 a02b89cf Guido Trotter
  SchedulerBreakout exception, which will force the current scheduler.run()
64 a02b89cf Guido Trotter
  invocation to terminate, so that we can also check for signals. The main loop
65 a02b89cf Guido Trotter
  will then call the scheduler run again, which will allow it to actually
66 a02b89cf Guido Trotter
  process any due events.
67 a02b89cf Guido Trotter

68 a02b89cf Guido Trotter
  This is needed because scheduler.run() doesn't support a count=..., as
69 a02b89cf Guido Trotter
  asyncore loop, and the scheduler module documents throwing exceptions from
70 a02b89cf Guido Trotter
  inside the delay function as an allowed usage model.
71 a02b89cf Guido Trotter

72 a02b89cf Guido Trotter
  """
73 a02b89cf Guido Trotter
  asyncore.loop(timeout=timeout, count=1, use_poll=True)
74 a02b89cf Guido Trotter
  raise SchedulerBreakout()
75 a02b89cf Guido Trotter
76 a02b89cf Guido Trotter
77 a02b89cf Guido Trotter
class AsyncoreScheduler(sched.scheduler):
78 a02b89cf Guido Trotter
  """Event scheduler integrated with asyncore
79 a02b89cf Guido Trotter

80 a02b89cf Guido Trotter
  """
81 a02b89cf Guido Trotter
  def __init__(self, timefunc):
82 a02b89cf Guido Trotter
    sched.scheduler.__init__(self, timefunc, AsyncoreDelayFunction)
83 821d9e43 Michael Hanselmann
84 821d9e43 Michael Hanselmann
85 b11780bb Guido Trotter
class GanetiBaseAsyncoreDispatcher(asyncore.dispatcher):
86 b11780bb Guido Trotter
  """Base Ganeti Asyncore Dispacher
87 b11780bb Guido Trotter

88 b11780bb Guido Trotter
  """
89 b11780bb Guido Trotter
  # this method is overriding an asyncore.dispatcher method
90 b11780bb Guido Trotter
  def handle_error(self):
91 b11780bb Guido Trotter
    """Log an error in handling any request, and proceed.
92 b11780bb Guido Trotter

93 b11780bb Guido Trotter
    """
94 b11780bb Guido Trotter
    logging.exception("Error while handling asyncore request")
95 b11780bb Guido Trotter
96 b11780bb Guido Trotter
  # this method is overriding an asyncore.dispatcher method
97 b11780bb Guido Trotter
  def writable(self):
98 b11780bb Guido Trotter
    """Most of the time we don't want to check for writability.
99 b11780bb Guido Trotter

100 b11780bb Guido Trotter
    """
101 b11780bb Guido Trotter
    return False
102 b11780bb Guido Trotter
103 b11780bb Guido Trotter
104 a4b605ae Guido Trotter
class AsyncStreamServer(GanetiBaseAsyncoreDispatcher):
105 a4b605ae Guido Trotter
  """A stream server to use with asyncore.
106 a4b605ae Guido Trotter

107 a4b605ae Guido Trotter
  Each request is accepted, and then dispatched to a separate asyncore
108 a4b605ae Guido Trotter
  dispatcher to handle.
109 a4b605ae Guido Trotter

110 a4b605ae Guido Trotter
  """
111 a4b605ae Guido Trotter
112 a4b605ae Guido Trotter
  _REQUEST_QUEUE_SIZE = 5
113 a4b605ae Guido Trotter
114 a4b605ae Guido Trotter
  def __init__(self, family, address):
115 a4b605ae Guido Trotter
    """Constructor for AsyncUnixStreamSocket
116 a4b605ae Guido Trotter

117 a4b605ae Guido Trotter
    @type family: integer
118 a4b605ae Guido Trotter
    @param family: socket family (one of socket.AF_*)
119 a4b605ae Guido Trotter
    @type address: address family dependent
120 a4b605ae Guido Trotter
    @param address: address to bind the socket to
121 a4b605ae Guido Trotter

122 a4b605ae Guido Trotter
    """
123 a4b605ae Guido Trotter
    GanetiBaseAsyncoreDispatcher.__init__(self)
124 a4b605ae Guido Trotter
    self.family = family
125 a4b605ae Guido Trotter
    self.create_socket(self.family, socket.SOCK_STREAM)
126 a4b605ae Guido Trotter
    self.set_reuse_addr()
127 a4b605ae Guido Trotter
    self.bind(address)
128 a4b605ae Guido Trotter
    self.listen(self._REQUEST_QUEUE_SIZE)
129 a4b605ae Guido Trotter
130 a4b605ae Guido Trotter
  # this method is overriding an asyncore.dispatcher method
131 a4b605ae Guido Trotter
  def handle_accept(self):
132 a4b605ae Guido Trotter
    """Accept a new client connection.
133 a4b605ae Guido Trotter

134 a4b605ae Guido Trotter
    Creates a new instance of the handler class, which will use asyncore to
135 a4b605ae Guido Trotter
    serve the client.
136 a4b605ae Guido Trotter

137 a4b605ae Guido Trotter
    """
138 a4b605ae Guido Trotter
    accept_result = utils.IgnoreSignals(self.accept)
139 a4b605ae Guido Trotter
    if accept_result is not None:
140 a4b605ae Guido Trotter
      connected_socket, client_address = accept_result
141 a4b605ae Guido Trotter
      if self.family == socket.AF_UNIX:
142 a4b605ae Guido Trotter
        # override the client address, as for unix sockets nothing meaningful
143 a4b605ae Guido Trotter
        # is passed in from accept anyway
144 a744b676 Manuel Franceschini
        client_address = netutils.GetSocketCredentials(connected_socket)
145 a4b605ae Guido Trotter
      logging.info("Accepted connection from %s",
146 1a8337f2 Manuel Franceschini
                   netutils.FormatAddress(self.family, client_address))
147 a4b605ae Guido Trotter
      self.handle_connection(connected_socket, client_address)
148 a4b605ae Guido Trotter
149 a4b605ae Guido Trotter
  def handle_connection(self, connected_socket, client_address):
150 a4b605ae Guido Trotter
    """Handle an already accepted connection.
151 a4b605ae Guido Trotter

152 a4b605ae Guido Trotter
    """
153 a4b605ae Guido Trotter
    raise NotImplementedError
154 a4b605ae Guido Trotter
155 a4b605ae Guido Trotter
156 b66ab629 Guido Trotter
class AsyncTerminatedMessageStream(asynchat.async_chat):
157 b66ab629 Guido Trotter
  """A terminator separated message stream asyncore module.
158 b66ab629 Guido Trotter

159 b66ab629 Guido Trotter
  Handles a stream connection receiving messages terminated by a defined
160 b66ab629 Guido Trotter
  separator. For each complete message handle_message is called.
161 b66ab629 Guido Trotter

162 b66ab629 Guido Trotter
  """
163 37e62cb9 Guido Trotter
  def __init__(self, connected_socket, peer_address, terminator, family,
164 37e62cb9 Guido Trotter
               unhandled_limit):
165 b66ab629 Guido Trotter
    """AsyncTerminatedMessageStream constructor.
166 b66ab629 Guido Trotter

167 b66ab629 Guido Trotter
    @type connected_socket: socket.socket
168 b66ab629 Guido Trotter
    @param connected_socket: connected stream socket to receive messages from
169 b66ab629 Guido Trotter
    @param peer_address: family-specific peer address
170 b66ab629 Guido Trotter
    @type terminator: string
171 b66ab629 Guido Trotter
    @param terminator: terminator separating messages in the stream
172 b66ab629 Guido Trotter
    @type family: integer
173 b66ab629 Guido Trotter
    @param family: socket family
174 37e62cb9 Guido Trotter
    @type unhandled_limit: integer or None
175 37e62cb9 Guido Trotter
    @param unhandled_limit: maximum unanswered messages
176 b66ab629 Guido Trotter

177 b66ab629 Guido Trotter
    """
178 b66ab629 Guido Trotter
    # python 2.4/2.5 uses conn=... while 2.6 has sock=... we have to cheat by
179 b66ab629 Guido Trotter
    # using a positional argument rather than a keyword one.
180 b66ab629 Guido Trotter
    asynchat.async_chat.__init__(self, connected_socket)
181 b66ab629 Guido Trotter
    self.connected_socket = connected_socket
182 b66ab629 Guido Trotter
    # on python 2.4 there is no "family" attribute for the socket class
183 b66ab629 Guido Trotter
    # FIXME: when we move to python 2.5 or above remove the family parameter
184 b66ab629 Guido Trotter
    #self.family = self.connected_socket.family
185 b66ab629 Guido Trotter
    self.family = family
186 b66ab629 Guido Trotter
    self.peer_address = peer_address
187 b66ab629 Guido Trotter
    self.terminator = terminator
188 37e62cb9 Guido Trotter
    self.unhandled_limit = unhandled_limit
189 b66ab629 Guido Trotter
    self.set_terminator(terminator)
190 b66ab629 Guido Trotter
    self.ibuffer = []
191 37e62cb9 Guido Trotter
    self.receive_count = 0
192 37e62cb9 Guido Trotter
    self.send_count = 0
193 1e063ccd Guido Trotter
    self.oqueue = collections.deque()
194 37e62cb9 Guido Trotter
    self.iqueue = collections.deque()
195 b66ab629 Guido Trotter
196 b66ab629 Guido Trotter
  # this method is overriding an asynchat.async_chat method
197 b66ab629 Guido Trotter
  def collect_incoming_data(self, data):
198 b66ab629 Guido Trotter
    self.ibuffer.append(data)
199 b66ab629 Guido Trotter
200 37e62cb9 Guido Trotter
  def _can_handle_message(self):
201 37e62cb9 Guido Trotter
    return (self.unhandled_limit is None or
202 37e62cb9 Guido Trotter
            (self.receive_count < self.send_count + self.unhandled_limit) and
203 37e62cb9 Guido Trotter
             not self.iqueue)
204 37e62cb9 Guido Trotter
205 b66ab629 Guido Trotter
  # this method is overriding an asynchat.async_chat method
206 b66ab629 Guido Trotter
  def found_terminator(self):
207 b66ab629 Guido Trotter
    message = "".join(self.ibuffer)
208 b66ab629 Guido Trotter
    self.ibuffer = []
209 37e62cb9 Guido Trotter
    message_id = self.receive_count
210 37e62cb9 Guido Trotter
    # We need to increase the receive_count after checking if the message can
211 37e62cb9 Guido Trotter
    # be handled, but before calling handle_message
212 37e62cb9 Guido Trotter
    can_handle = self._can_handle_message()
213 37e62cb9 Guido Trotter
    self.receive_count += 1
214 37e62cb9 Guido Trotter
    if can_handle:
215 37e62cb9 Guido Trotter
      self.handle_message(message, message_id)
216 37e62cb9 Guido Trotter
    else:
217 37e62cb9 Guido Trotter
      self.iqueue.append((message, message_id))
218 b66ab629 Guido Trotter
219 b66ab629 Guido Trotter
  def handle_message(self, message, message_id):
220 b66ab629 Guido Trotter
    """Handle a terminated message.
221 b66ab629 Guido Trotter

222 b66ab629 Guido Trotter
    @type message: string
223 b66ab629 Guido Trotter
    @param message: message to handle
224 b66ab629 Guido Trotter
    @type message_id: integer
225 b66ab629 Guido Trotter
    @param message_id: stream's message sequence number
226 b66ab629 Guido Trotter

227 b66ab629 Guido Trotter
    """
228 b66ab629 Guido Trotter
    pass
229 b66ab629 Guido Trotter
    # TODO: move this method to raise NotImplementedError
230 b66ab629 Guido Trotter
    # raise NotImplementedError
231 b66ab629 Guido Trotter
232 1e063ccd Guido Trotter
  def send_message(self, message):
233 1e063ccd Guido Trotter
    """Send a message to the remote peer. This function is thread-safe.
234 1e063ccd Guido Trotter

235 1e063ccd Guido Trotter
    @type message: string
236 1e063ccd Guido Trotter
    @param message: message to send, without the terminator
237 1e063ccd Guido Trotter

238 1e063ccd Guido Trotter
    @warning: If calling this function from a thread different than the one
239 1e063ccd Guido Trotter
    performing the main asyncore loop, remember that you have to wake that one
240 1e063ccd Guido Trotter
    up.
241 1e063ccd Guido Trotter

242 1e063ccd Guido Trotter
    """
243 1e063ccd Guido Trotter
    # If we just append the message we received to the output queue, this
244 1e063ccd Guido Trotter
    # function can be safely called by multiple threads at the same time, and
245 37e62cb9 Guido Trotter
    # we don't need locking, since deques are thread safe. handle_write in the
246 37e62cb9 Guido Trotter
    # asyncore thread will handle the next input message if there are any
247 37e62cb9 Guido Trotter
    # enqueued.
248 1e063ccd Guido Trotter
    self.oqueue.append(message)
249 1e063ccd Guido Trotter
250 1e063ccd Guido Trotter
  # this method is overriding an asyncore.dispatcher method
251 37e62cb9 Guido Trotter
  def readable(self):
252 37e62cb9 Guido Trotter
    # read from the socket if we can handle the next requests
253 37e62cb9 Guido Trotter
    return self._can_handle_message() and asynchat.async_chat.readable(self)
254 37e62cb9 Guido Trotter
255 37e62cb9 Guido Trotter
  # this method is overriding an asyncore.dispatcher method
256 1e063ccd Guido Trotter
  def writable(self):
257 1e063ccd Guido Trotter
    # the output queue may become full just after we called writable. This only
258 1e063ccd Guido Trotter
    # works if we know we'll have something else waking us up from the select,
259 1e063ccd Guido Trotter
    # in such case, anyway.
260 1e063ccd Guido Trotter
    return asynchat.async_chat.writable(self) or self.oqueue
261 1e063ccd Guido Trotter
262 1e063ccd Guido Trotter
  # this method is overriding an asyncore.dispatcher method
263 1e063ccd Guido Trotter
  def handle_write(self):
264 1e063ccd Guido Trotter
    if self.oqueue:
265 37e62cb9 Guido Trotter
      # if we have data in the output queue, then send_message was called.
266 37e62cb9 Guido Trotter
      # this means we can process one more message from the input queue, if
267 37e62cb9 Guido Trotter
      # there are any.
268 1e063ccd Guido Trotter
      data = self.oqueue.popleft()
269 1e063ccd Guido Trotter
      self.push(data + self.terminator)
270 37e62cb9 Guido Trotter
      self.send_count += 1
271 37e62cb9 Guido Trotter
      if self.iqueue:
272 37e62cb9 Guido Trotter
        self.handle_message(*self.iqueue.popleft())
273 1e063ccd Guido Trotter
    self.initiate_send()
274 1e063ccd Guido Trotter
275 b66ab629 Guido Trotter
  def close_log(self):
276 b66ab629 Guido Trotter
    logging.info("Closing connection from %s",
277 1a8337f2 Manuel Franceschini
                 netutils.FormatAddress(self.family, self.peer_address))
278 b66ab629 Guido Trotter
    self.close()
279 b66ab629 Guido Trotter
280 b66ab629 Guido Trotter
  # this method is overriding an asyncore.dispatcher method
281 b66ab629 Guido Trotter
  def handle_expt(self):
282 b66ab629 Guido Trotter
    self.close_log()
283 b66ab629 Guido Trotter
284 b66ab629 Guido Trotter
  # this method is overriding an asyncore.dispatcher method
285 b66ab629 Guido Trotter
  def handle_error(self):
286 b66ab629 Guido Trotter
    """Log an error in handling any request, and proceed.
287 b66ab629 Guido Trotter

288 b66ab629 Guido Trotter
    """
289 b66ab629 Guido Trotter
    logging.exception("Error while handling asyncore request")
290 b66ab629 Guido Trotter
    self.close_log()
291 b66ab629 Guido Trotter
292 b66ab629 Guido Trotter
293 b11780bb Guido Trotter
class AsyncUDPSocket(GanetiBaseAsyncoreDispatcher):
294 5f3269fc Guido Trotter
  """An improved asyncore udp socket.
295 5f3269fc Guido Trotter

296 5f3269fc Guido Trotter
  """
297 d8bcfe21 Manuel Franceschini
  def __init__(self, family):
298 5f3269fc Guido Trotter
    """Constructor for AsyncUDPSocket
299 5f3269fc Guido Trotter

300 5f3269fc Guido Trotter
    """
301 b11780bb Guido Trotter
    GanetiBaseAsyncoreDispatcher.__init__(self)
302 5f3269fc Guido Trotter
    self._out_queue = []
303 d8bcfe21 Manuel Franceschini
    self._family = family
304 d8bcfe21 Manuel Franceschini
    self.create_socket(family, socket.SOCK_DGRAM)
305 5f3269fc Guido Trotter
306 5f3269fc Guido Trotter
  # this method is overriding an asyncore.dispatcher method
307 5f3269fc Guido Trotter
  def handle_connect(self):
308 5f3269fc Guido Trotter
    # Python thinks that the first udp message from a source qualifies as a
309 5f3269fc Guido Trotter
    # "connect" and further ones are part of the same connection. We beg to
310 5f3269fc Guido Trotter
    # differ and treat all messages equally.
311 5f3269fc Guido Trotter
    pass
312 5f3269fc Guido Trotter
313 5f3269fc Guido Trotter
  # this method is overriding an asyncore.dispatcher method
314 5f3269fc Guido Trotter
  def handle_read(self):
315 6e7e58b4 Guido Trotter
    recv_result = utils.IgnoreSignals(self.recvfrom,
316 6e7e58b4 Guido Trotter
                                      constants.MAX_UDP_DATA_SIZE)
317 6e7e58b4 Guido Trotter
    if recv_result is not None:
318 6e7e58b4 Guido Trotter
      payload, address = recv_result
319 d8bcfe21 Manuel Franceschini
      if self._family == socket.AF_INET6:
320 d8bcfe21 Manuel Franceschini
        # we ignore 'flow info' and 'scope id' as we don't need them
321 d8bcfe21 Manuel Franceschini
        ip, port, _, _ = address
322 d8bcfe21 Manuel Franceschini
      else:
323 d8bcfe21 Manuel Franceschini
        ip, port = address
324 d8bcfe21 Manuel Franceschini
325 6e7e58b4 Guido Trotter
      self.handle_datagram(payload, ip, port)
326 5f3269fc Guido Trotter
327 5f3269fc Guido Trotter
  def handle_datagram(self, payload, ip, port):
328 5f3269fc Guido Trotter
    """Handle an already read udp datagram
329 5f3269fc Guido Trotter

330 5f3269fc Guido Trotter
    """
331 5f3269fc Guido Trotter
    raise NotImplementedError
332 5f3269fc Guido Trotter
333 5f3269fc Guido Trotter
  # this method is overriding an asyncore.dispatcher method
334 5f3269fc Guido Trotter
  def writable(self):
335 5f3269fc Guido Trotter
    # We should check whether we can write to the socket only if we have
336 5f3269fc Guido Trotter
    # something scheduled to be written
337 5f3269fc Guido Trotter
    return bool(self._out_queue)
338 5f3269fc Guido Trotter
339 48bf6352 Guido Trotter
  # this method is overriding an asyncore.dispatcher method
340 5f3269fc Guido Trotter
  def handle_write(self):
341 3660fcf5 Guido Trotter
    if not self._out_queue:
342 3660fcf5 Guido Trotter
      logging.error("handle_write called with empty output queue")
343 3660fcf5 Guido Trotter
      return
344 3660fcf5 Guido Trotter
    (ip, port, payload) = self._out_queue[0]
345 232144d0 Guido Trotter
    utils.IgnoreSignals(self.sendto, payload, 0, (ip, port))
346 3660fcf5 Guido Trotter
    self._out_queue.pop(0)
347 3660fcf5 Guido Trotter
348 5f3269fc Guido Trotter
  def enqueue_send(self, ip, port, payload):
349 5f3269fc Guido Trotter
    """Enqueue a datagram to be sent when possible
350 5f3269fc Guido Trotter

351 5f3269fc Guido Trotter
    """
352 c8eded0b Guido Trotter
    if len(payload) > constants.MAX_UDP_DATA_SIZE:
353 c8eded0b Guido Trotter
      raise errors.UdpDataSizeError('Packet too big: %s > %s' % (len(payload),
354 c8eded0b Guido Trotter
                                    constants.MAX_UDP_DATA_SIZE))
355 5f3269fc Guido Trotter
    self._out_queue.append((ip, port, payload))
356 5f3269fc Guido Trotter
357 6ddf5c8f Guido Trotter
  def process_next_packet(self, timeout=0):
358 6ddf5c8f Guido Trotter
    """Process the next datagram, waiting for it if necessary.
359 6ddf5c8f Guido Trotter

360 6ddf5c8f Guido Trotter
    @type timeout: float
361 6ddf5c8f Guido Trotter
    @param timeout: how long to wait for data
362 6ddf5c8f Guido Trotter
    @rtype: boolean
363 6ddf5c8f Guido Trotter
    @return: True if some data has been handled, False otherwise
364 6ddf5c8f Guido Trotter

365 6ddf5c8f Guido Trotter
    """
366 1b429e2a Iustin Pop
    result = utils.WaitForFdCondition(self, select.POLLIN, timeout)
367 1b429e2a Iustin Pop
    if result is not None and result & select.POLLIN:
368 3660fcf5 Guido Trotter
      self.handle_read()
369 6ddf5c8f Guido Trotter
      return True
370 6ddf5c8f Guido Trotter
    else:
371 6ddf5c8f Guido Trotter
      return False
372 6ddf5c8f Guido Trotter
373 5f3269fc Guido Trotter
374 495ba852 Guido Trotter
class AsyncAwaker(GanetiBaseAsyncoreDispatcher):
375 495ba852 Guido Trotter
  """A way to notify the asyncore loop that something is going on.
376 495ba852 Guido Trotter

377 495ba852 Guido Trotter
  If an asyncore daemon is multithreaded when a thread tries to push some data
378 495ba852 Guido Trotter
  to a socket, the main loop handling asynchronous requests might be sleeping
379 495ba852 Guido Trotter
  waiting on a select(). To avoid this it can create an instance of the
380 495ba852 Guido Trotter
  AsyncAwaker, which other threads can use to wake it up.
381 495ba852 Guido Trotter

382 495ba852 Guido Trotter
  """
383 495ba852 Guido Trotter
  def __init__(self, signal_fn=None):
384 495ba852 Guido Trotter
    """Constructor for AsyncAwaker
385 495ba852 Guido Trotter

386 495ba852 Guido Trotter
    @type signal_fn: function
387 495ba852 Guido Trotter
    @param signal_fn: function to call when awaken
388 495ba852 Guido Trotter

389 495ba852 Guido Trotter
    """
390 495ba852 Guido Trotter
    GanetiBaseAsyncoreDispatcher.__init__(self)
391 495ba852 Guido Trotter
    assert signal_fn == None or callable(signal_fn)
392 495ba852 Guido Trotter
    (self.in_socket, self.out_socket) = socket.socketpair(socket.AF_UNIX,
393 495ba852 Guido Trotter
                                                          socket.SOCK_STREAM)
394 495ba852 Guido Trotter
    self.in_socket.setblocking(0)
395 b628191f Guido Trotter
    self.in_socket.shutdown(socket.SHUT_WR)
396 b628191f Guido Trotter
    self.out_socket.shutdown(socket.SHUT_RD)
397 495ba852 Guido Trotter
    self.set_socket(self.in_socket)
398 495ba852 Guido Trotter
    self.need_signal = True
399 495ba852 Guido Trotter
    self.signal_fn = signal_fn
400 495ba852 Guido Trotter
    self.connected = True
401 495ba852 Guido Trotter
402 495ba852 Guido Trotter
  # this method is overriding an asyncore.dispatcher method
403 495ba852 Guido Trotter
  def handle_read(self):
404 495ba852 Guido Trotter
    utils.IgnoreSignals(self.recv, 4096)
405 495ba852 Guido Trotter
    if self.signal_fn:
406 495ba852 Guido Trotter
      self.signal_fn()
407 495ba852 Guido Trotter
    self.need_signal = True
408 495ba852 Guido Trotter
409 495ba852 Guido Trotter
  # this method is overriding an asyncore.dispatcher method
410 495ba852 Guido Trotter
  def close(self):
411 495ba852 Guido Trotter
    asyncore.dispatcher.close(self)
412 495ba852 Guido Trotter
    self.out_socket.close()
413 495ba852 Guido Trotter
414 495ba852 Guido Trotter
  def signal(self):
415 495ba852 Guido Trotter
    """Signal the asyncore main loop.
416 495ba852 Guido Trotter

417 495ba852 Guido Trotter
    Any data we send here will be ignored, but it will cause the select() call
418 495ba852 Guido Trotter
    to return.
419 495ba852 Guido Trotter

420 495ba852 Guido Trotter
    """
421 495ba852 Guido Trotter
    # Yes, there is a race condition here. No, we don't care, at worst we're
422 495ba852 Guido Trotter
    # sending more than one wakeup token, which doesn't harm at all.
423 495ba852 Guido Trotter
    if self.need_signal:
424 495ba852 Guido Trotter
      self.need_signal = False
425 495ba852 Guido Trotter
      self.out_socket.send("\0")
426 495ba852 Guido Trotter
427 495ba852 Guido Trotter
428 821d9e43 Michael Hanselmann
class Mainloop(object):
429 821d9e43 Michael Hanselmann
  """Generic mainloop for daemons
430 821d9e43 Michael Hanselmann

431 69b99987 Michael Hanselmann
  @ivar scheduler: A sched.scheduler object, which can be used to register
432 69b99987 Michael Hanselmann
    timed events
433 69b99987 Michael Hanselmann

434 821d9e43 Michael Hanselmann
  """
435 821d9e43 Michael Hanselmann
  def __init__(self):
436 b14b975f Michael Hanselmann
    """Constructs a new Mainloop instance.
437 b14b975f Michael Hanselmann

438 b14b975f Michael Hanselmann
    """
439 821d9e43 Michael Hanselmann
    self._signal_wait = []
440 a02b89cf Guido Trotter
    self.scheduler = AsyncoreScheduler(time.time)
441 821d9e43 Michael Hanselmann
442 9b739173 Guido Trotter
  @utils.SignalHandled([signal.SIGCHLD])
443 9b739173 Guido Trotter
  @utils.SignalHandled([signal.SIGTERM])
444 f59dce3e Guido Trotter
  @utils.SignalHandled([signal.SIGINT])
445 69b99987 Michael Hanselmann
  def Run(self, signal_handlers=None):
446 b14b975f Michael Hanselmann
    """Runs the mainloop.
447 b14b975f Michael Hanselmann

448 9b739173 Guido Trotter
    @type signal_handlers: dict
449 9b739173 Guido Trotter
    @param signal_handlers: signal->L{utils.SignalHandler} passed by decorator
450 b14b975f Michael Hanselmann

451 b14b975f Michael Hanselmann
    """
452 9b739173 Guido Trotter
    assert isinstance(signal_handlers, dict) and \
453 9b739173 Guido Trotter
           len(signal_handlers) > 0, \
454 9b739173 Guido Trotter
           "Broken SignalHandled decorator"
455 9b739173 Guido Trotter
    running = True
456 9b739173 Guido Trotter
    # Start actual main loop
457 9b739173 Guido Trotter
    while running:
458 a02b89cf Guido Trotter
      if not self.scheduler.empty():
459 a02b89cf Guido Trotter
        try:
460 a02b89cf Guido Trotter
          self.scheduler.run()
461 a02b89cf Guido Trotter
        except SchedulerBreakout:
462 a02b89cf Guido Trotter
          pass
463 a02b89cf Guido Trotter
      else:
464 a02b89cf Guido Trotter
        asyncore.loop(count=1, use_poll=True)
465 9b739173 Guido Trotter
466 9b739173 Guido Trotter
      # Check whether a signal was raised
467 9b739173 Guido Trotter
      for sig in signal_handlers:
468 9b739173 Guido Trotter
        handler = signal_handlers[sig]
469 9b739173 Guido Trotter
        if handler.called:
470 9b739173 Guido Trotter
          self._CallSignalWaiters(sig)
471 f59dce3e Guido Trotter
          running = sig not in (signal.SIGTERM, signal.SIGINT)
472 9b739173 Guido Trotter
          handler.Clear()
473 a570e2a8 Guido Trotter
474 b14b975f Michael Hanselmann
  def _CallSignalWaiters(self, signum):
475 b14b975f Michael Hanselmann
    """Calls all signal waiters for a certain signal.
476 b14b975f Michael Hanselmann

477 b14b975f Michael Hanselmann
    @type signum: int
478 b14b975f Michael Hanselmann
    @param signum: Signal number
479 b14b975f Michael Hanselmann

480 b14b975f Michael Hanselmann
    """
481 b14b975f Michael Hanselmann
    for owner in self._signal_wait:
482 a9fe7232 Guido Trotter
      owner.OnSignal(signum)
483 821d9e43 Michael Hanselmann
484 821d9e43 Michael Hanselmann
  def RegisterSignal(self, owner):
485 821d9e43 Michael Hanselmann
    """Registers a receiver for signal notifications
486 821d9e43 Michael Hanselmann

487 821d9e43 Michael Hanselmann
    The receiver must support a "OnSignal(self, signum)" function.
488 821d9e43 Michael Hanselmann

489 821d9e43 Michael Hanselmann
    @type owner: instance
490 821d9e43 Michael Hanselmann
    @param owner: Receiver
491 821d9e43 Michael Hanselmann

492 821d9e43 Michael Hanselmann
    """
493 821d9e43 Michael Hanselmann
    self._signal_wait.append(owner)
494 b11c9e5c Michael Hanselmann
495 04ccf5e9 Guido Trotter
496 30dabd03 Michael Hanselmann
def GenericMain(daemon_name, optionparser, dirs, check_fn, exec_fn,
497 1c54156d Luca Bigliardi
                multithreaded=False, console_logging=False,
498 743b53d4 René Nussbaumer
                default_ssl_cert=None, default_ssl_key=None,
499 743b53d4 René Nussbaumer
                user=_DEFAULT_RUN_USER, group=_DEFAULT_RUN_GROUP):
500 04ccf5e9 Guido Trotter
  """Shared main function for daemons.
501 04ccf5e9 Guido Trotter

502 04ccf5e9 Guido Trotter
  @type daemon_name: string
503 04ccf5e9 Guido Trotter
  @param daemon_name: daemon name
504 69b99987 Michael Hanselmann
  @type optionparser: optparse.OptionParser
505 04ccf5e9 Guido Trotter
  @param optionparser: initialized optionparser with daemon-specific options
506 04ccf5e9 Guido Trotter
                       (common -f -d options will be handled by this module)
507 5a062513 Guido Trotter
  @type dirs: list of (string, integer)
508 5a062513 Guido Trotter
  @param dirs: list of directories that must be created if they don't exist,
509 5a062513 Guido Trotter
               and the permissions to be used to create them
510 04ccf5e9 Guido Trotter
  @type check_fn: function which accepts (options, args)
511 04ccf5e9 Guido Trotter
  @param check_fn: function that checks start conditions and exits if they're
512 04ccf5e9 Guido Trotter
                   not met
513 04ccf5e9 Guido Trotter
  @type exec_fn: function which accepts (options, args)
514 04ccf5e9 Guido Trotter
  @param exec_fn: function that's executed with the daemon's pid file held, and
515 04ccf5e9 Guido Trotter
                  runs the daemon itself.
516 30dabd03 Michael Hanselmann
  @type multithreaded: bool
517 30dabd03 Michael Hanselmann
  @param multithreaded: Whether the daemon uses threads
518 ff917534 Luca Bigliardi
  @type console_logging: boolean
519 ff917534 Luca Bigliardi
  @param console_logging: if True, the daemon will fall back to the system
520 ff917534 Luca Bigliardi
                          console if logging fails
521 0648750e Michael Hanselmann
  @type default_ssl_cert: string
522 0648750e Michael Hanselmann
  @param default_ssl_cert: Default SSL certificate path
523 0648750e Michael Hanselmann
  @type default_ssl_key: string
524 0648750e Michael Hanselmann
  @param default_ssl_key: Default SSL key path
525 743b53d4 René Nussbaumer
  @param user: Default user to run as
526 743b53d4 René Nussbaumer
  @type user: string
527 743b53d4 René Nussbaumer
  @param group: Default group to run as
528 743b53d4 René Nussbaumer
  @type group: string
529 04ccf5e9 Guido Trotter

530 04ccf5e9 Guido Trotter
  """
531 04ccf5e9 Guido Trotter
  optionparser.add_option("-f", "--foreground", dest="fork",
532 04ccf5e9 Guido Trotter
                          help="Don't detach from the current terminal",
533 04ccf5e9 Guido Trotter
                          default=True, action="store_false")
534 04ccf5e9 Guido Trotter
  optionparser.add_option("-d", "--debug", dest="debug",
535 04ccf5e9 Guido Trotter
                          help="Enable some debug messages",
536 04ccf5e9 Guido Trotter
                          default=False, action="store_true")
537 551b6283 Iustin Pop
  optionparser.add_option("--syslog", dest="syslog",
538 551b6283 Iustin Pop
                          help="Enable logging to syslog (except debug"
539 551b6283 Iustin Pop
                          " messages); one of 'no', 'yes' or 'only' [%s]" %
540 551b6283 Iustin Pop
                          constants.SYSLOG_USAGE,
541 551b6283 Iustin Pop
                          default=constants.SYSLOG_USAGE,
542 551b6283 Iustin Pop
                          choices=["no", "yes", "only"])
543 0a71aa17 Michael Hanselmann
544 04ccf5e9 Guido Trotter
  if daemon_name in constants.DAEMONS_PORTS:
545 14f5f1b6 Manuel Franceschini
    default_bind_address = constants.IP4_ADDRESS_ANY
546 e7323b5e Manuel Franceschini
    try:
547 e7323b5e Manuel Franceschini
      family = ssconf.SimpleStore().GetPrimaryIPFamily()
548 e7323b5e Manuel Franceschini
      if family == netutils.IP6Address.family:
549 e7323b5e Manuel Franceschini
        default_bind_address = constants.IP6_ADDRESS_ANY
550 e7323b5e Manuel Franceschini
    except errors.ConfigurationError:
551 e7323b5e Manuel Franceschini
      # This case occurs when adding a node, as there is no ssconf available
552 e7323b5e Manuel Franceschini
      # when noded is first started. In that case, however, the correct
553 e7323b5e Manuel Franceschini
      # bind_address must be passed
554 e7323b5e Manuel Franceschini
      pass
555 e7323b5e Manuel Franceschini
556 a744b676 Manuel Franceschini
    default_port = netutils.GetDaemonPort(daemon_name)
557 0a71aa17 Michael Hanselmann
558 0a71aa17 Michael Hanselmann
    # For networked daemons we allow choosing the port and bind address
559 04ccf5e9 Guido Trotter
    optionparser.add_option("-p", "--port", dest="port",
560 0a71aa17 Michael Hanselmann
                            help="Network port (default: %s)" % default_port,
561 0a71aa17 Michael Hanselmann
                            default=default_port, type="int")
562 04ccf5e9 Guido Trotter
    optionparser.add_option("-b", "--bind", dest="bind_address",
563 e7323b5e Manuel Franceschini
                            help=("Bind address (default: '%s')" %
564 0a71aa17 Michael Hanselmann
                                  default_bind_address),
565 0a71aa17 Michael Hanselmann
                            default=default_bind_address, metavar="ADDRESS")
566 04ccf5e9 Guido Trotter
567 0648750e Michael Hanselmann
  if default_ssl_key is not None and default_ssl_cert is not None:
568 3b1b0cb6 Guido Trotter
    optionparser.add_option("--no-ssl", dest="ssl",
569 3b1b0cb6 Guido Trotter
                            help="Do not secure HTTP protocol with SSL",
570 3b1b0cb6 Guido Trotter
                            default=True, action="store_false")
571 3b1b0cb6 Guido Trotter
    optionparser.add_option("-K", "--ssl-key", dest="ssl_key",
572 0648750e Michael Hanselmann
                            help=("SSL key path (default: %s)" %
573 0648750e Michael Hanselmann
                                  default_ssl_key),
574 0648750e Michael Hanselmann
                            default=default_ssl_key, type="string",
575 0648750e Michael Hanselmann
                            metavar="SSL_KEY_PATH")
576 3b1b0cb6 Guido Trotter
    optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert",
577 0648750e Michael Hanselmann
                            help=("SSL certificate path (default: %s)" %
578 0648750e Michael Hanselmann
                                  default_ssl_cert),
579 0648750e Michael Hanselmann
                            default=default_ssl_cert, type="string",
580 0648750e Michael Hanselmann
                            metavar="SSL_CERT_PATH")
581 3b1b0cb6 Guido Trotter
582 30dabd03 Michael Hanselmann
  # Disable the use of fork(2) if the daemon uses threads
583 30dabd03 Michael Hanselmann
  utils.no_fork = multithreaded
584 04ccf5e9 Guido Trotter
585 04ccf5e9 Guido Trotter
  options, args = optionparser.parse_args()
586 04ccf5e9 Guido Trotter
587 0648750e Michael Hanselmann
  if getattr(options, "ssl", False):
588 0648750e Michael Hanselmann
    ssl_paths = {
589 0648750e Michael Hanselmann
      "certificate": options.ssl_cert,
590 0648750e Michael Hanselmann
      "key": options.ssl_key,
591 0648750e Michael Hanselmann
      }
592 0648750e Michael Hanselmann
593 0648750e Michael Hanselmann
    for name, path in ssl_paths.iteritems():
594 0648750e Michael Hanselmann
      if not os.path.isfile(path):
595 0648750e Michael Hanselmann
        print >> sys.stderr, "SSL %s file '%s' was not found" % (name, path)
596 3b1b0cb6 Guido Trotter
        sys.exit(constants.EXIT_FAILURE)
597 3b1b0cb6 Guido Trotter
598 0648750e Michael Hanselmann
    # TODO: By initiating http.HttpSslParams here we would only read the files
599 0648750e Michael Hanselmann
    # once and have a proper validation (isfile returns False on directories)
600 0648750e Michael Hanselmann
    # at the same time.
601 0648750e Michael Hanselmann
602 3b1b0cb6 Guido Trotter
  if check_fn is not None:
603 3b1b0cb6 Guido Trotter
    check_fn(options, args)
604 3b1b0cb6 Guido Trotter
605 04ccf5e9 Guido Trotter
  utils.EnsureDirs(dirs)
606 04ccf5e9 Guido Trotter
607 04ccf5e9 Guido Trotter
  if options.fork:
608 743b53d4 René Nussbaumer
    try:
609 743b53d4 René Nussbaumer
      uid = pwd.getpwnam(user).pw_uid
610 743b53d4 René Nussbaumer
      gid = grp.getgrnam(group).gr_gid
611 743b53d4 René Nussbaumer
    except KeyError:
612 743b53d4 René Nussbaumer
      raise errors.ConfigurationError("User or group not existing on system:"
613 743b53d4 René Nussbaumer
                                      " %s:%s" % (user, group))
614 04ccf5e9 Guido Trotter
    utils.CloseFDs()
615 743b53d4 René Nussbaumer
    utils.Daemonize(constants.DAEMONS_LOGFILES[daemon_name], uid, gid)
616 04ccf5e9 Guido Trotter
617 04ccf5e9 Guido Trotter
  utils.WritePidFile(daemon_name)
618 04ccf5e9 Guido Trotter
  try:
619 04ccf5e9 Guido Trotter
    utils.SetupLogging(logfile=constants.DAEMONS_LOGFILES[daemon_name],
620 04ccf5e9 Guido Trotter
                       debug=options.debug,
621 04ccf5e9 Guido Trotter
                       stderr_logging=not options.fork,
622 e7307f08 Michael Hanselmann
                       multithreaded=multithreaded,
623 551b6283 Iustin Pop
                       program=daemon_name,
624 ff917534 Luca Bigliardi
                       syslog=options.syslog,
625 ff917534 Luca Bigliardi
                       console_logging=console_logging)
626 099c52ad Iustin Pop
    logging.info("%s daemon startup", daemon_name)
627 04ccf5e9 Guido Trotter
    exec_fn(options, args)
628 04ccf5e9 Guido Trotter
  finally:
629 04ccf5e9 Guido Trotter
    utils.RemovePidFile(daemon_name)