Statistics
| Branch: | Tag: | Revision:

root / lib / daemon.py @ 88b92fe3

History | View | Annotate | Download (20.4 kB)

1 821d9e43 Michael Hanselmann
#
2 821d9e43 Michael Hanselmann
#
3 821d9e43 Michael Hanselmann
4 1a8337f2 Manuel Franceschini
# Copyright (C) 2006, 2007, 2008, 2010 Google Inc.
5 821d9e43 Michael Hanselmann
#
6 821d9e43 Michael Hanselmann
# This program is free software; you can redistribute it and/or modify
7 821d9e43 Michael Hanselmann
# it under the terms of the GNU General Public License as published by
8 821d9e43 Michael Hanselmann
# the Free Software Foundation; either version 2 of the License, or
9 821d9e43 Michael Hanselmann
# (at your option) any later version.
10 821d9e43 Michael Hanselmann
#
11 821d9e43 Michael Hanselmann
# This program is distributed in the hope that it will be useful, but
12 821d9e43 Michael Hanselmann
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 821d9e43 Michael Hanselmann
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 821d9e43 Michael Hanselmann
# General Public License for more details.
15 821d9e43 Michael Hanselmann
#
16 821d9e43 Michael Hanselmann
# You should have received a copy of the GNU General Public License
17 821d9e43 Michael Hanselmann
# along with this program; if not, write to the Free Software
18 821d9e43 Michael Hanselmann
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 821d9e43 Michael Hanselmann
# 02110-1301, USA.
20 821d9e43 Michael Hanselmann
21 821d9e43 Michael Hanselmann
22 821d9e43 Michael Hanselmann
"""Module with helper classes and functions for daemons"""
23 821d9e43 Michael Hanselmann
24 821d9e43 Michael Hanselmann
25 112d240d Guido Trotter
import asyncore
26 b66ab629 Guido Trotter
import asynchat
27 1e063ccd Guido Trotter
import collections
28 3b1b0cb6 Guido Trotter
import os
29 821d9e43 Michael Hanselmann
import signal
30 04ccf5e9 Guido Trotter
import logging
31 a02b89cf Guido Trotter
import sched
32 a02b89cf Guido Trotter
import time
33 5f3269fc Guido Trotter
import socket
34 6ddf5c8f Guido Trotter
import select
35 c124045f Iustin Pop
import sys
36 821d9e43 Michael Hanselmann
37 821d9e43 Michael Hanselmann
from ganeti import utils
38 04ccf5e9 Guido Trotter
from ganeti import constants
39 a02b89cf Guido Trotter
from ganeti import errors
40 a744b676 Manuel Franceschini
from ganeti import netutils
41 e7323b5e Manuel Franceschini
from ganeti import ssconf
42 a02b89cf Guido Trotter
43 a02b89cf Guido Trotter
44 a02b89cf Guido Trotter
class SchedulerBreakout(Exception):
45 a02b89cf Guido Trotter
  """Exception used to get out of the scheduler loop
46 a02b89cf Guido Trotter

47 a02b89cf Guido Trotter
  """
48 a02b89cf Guido Trotter
49 a02b89cf Guido Trotter
50 a02b89cf Guido Trotter
def AsyncoreDelayFunction(timeout):
51 a02b89cf Guido Trotter
  """Asyncore-compatible scheduler delay function.
52 a02b89cf Guido Trotter

53 a02b89cf Guido Trotter
  This is a delay function for sched that, rather than actually sleeping,
54 a02b89cf Guido Trotter
  executes asyncore events happening in the meantime.
55 a02b89cf Guido Trotter

56 a02b89cf Guido Trotter
  After an event has occurred, rather than returning, it raises a
57 a02b89cf Guido Trotter
  SchedulerBreakout exception, which will force the current scheduler.run()
58 a02b89cf Guido Trotter
  invocation to terminate, so that we can also check for signals. The main loop
59 a02b89cf Guido Trotter
  will then call the scheduler run again, which will allow it to actually
60 a02b89cf Guido Trotter
  process any due events.
61 a02b89cf Guido Trotter

62 a02b89cf Guido Trotter
  This is needed because scheduler.run() doesn't support a count=..., as
63 a02b89cf Guido Trotter
  asyncore loop, and the scheduler module documents throwing exceptions from
64 a02b89cf Guido Trotter
  inside the delay function as an allowed usage model.
65 a02b89cf Guido Trotter

66 a02b89cf Guido Trotter
  """
67 a02b89cf Guido Trotter
  asyncore.loop(timeout=timeout, count=1, use_poll=True)
68 a02b89cf Guido Trotter
  raise SchedulerBreakout()
69 a02b89cf Guido Trotter
70 a02b89cf Guido Trotter
71 a02b89cf Guido Trotter
class AsyncoreScheduler(sched.scheduler):
72 a02b89cf Guido Trotter
  """Event scheduler integrated with asyncore
73 a02b89cf Guido Trotter

74 a02b89cf Guido Trotter
  """
75 a02b89cf Guido Trotter
  def __init__(self, timefunc):
76 a02b89cf Guido Trotter
    sched.scheduler.__init__(self, timefunc, AsyncoreDelayFunction)
77 821d9e43 Michael Hanselmann
78 821d9e43 Michael Hanselmann
79 b11780bb Guido Trotter
class GanetiBaseAsyncoreDispatcher(asyncore.dispatcher):
80 b11780bb Guido Trotter
  """Base Ganeti Asyncore Dispacher
81 b11780bb Guido Trotter

82 b11780bb Guido Trotter
  """
83 b11780bb Guido Trotter
  # this method is overriding an asyncore.dispatcher method
84 b11780bb Guido Trotter
  def handle_error(self):
85 b11780bb Guido Trotter
    """Log an error in handling any request, and proceed.
86 b11780bb Guido Trotter

87 b11780bb Guido Trotter
    """
88 b11780bb Guido Trotter
    logging.exception("Error while handling asyncore request")
89 b11780bb Guido Trotter
90 b11780bb Guido Trotter
  # this method is overriding an asyncore.dispatcher method
91 b11780bb Guido Trotter
  def writable(self):
92 b11780bb Guido Trotter
    """Most of the time we don't want to check for writability.
93 b11780bb Guido Trotter

94 b11780bb Guido Trotter
    """
95 b11780bb Guido Trotter
    return False
96 b11780bb Guido Trotter
97 b11780bb Guido Trotter
98 a4b605ae Guido Trotter
class AsyncStreamServer(GanetiBaseAsyncoreDispatcher):
99 a4b605ae Guido Trotter
  """A stream server to use with asyncore.
100 a4b605ae Guido Trotter

101 a4b605ae Guido Trotter
  Each request is accepted, and then dispatched to a separate asyncore
102 a4b605ae Guido Trotter
  dispatcher to handle.
103 a4b605ae Guido Trotter

104 a4b605ae Guido Trotter
  """
105 a4b605ae Guido Trotter
106 a4b605ae Guido Trotter
  _REQUEST_QUEUE_SIZE = 5
107 a4b605ae Guido Trotter
108 a4b605ae Guido Trotter
  def __init__(self, family, address):
109 a4b605ae Guido Trotter
    """Constructor for AsyncUnixStreamSocket
110 a4b605ae Guido Trotter

111 a4b605ae Guido Trotter
    @type family: integer
112 a4b605ae Guido Trotter
    @param family: socket family (one of socket.AF_*)
113 a4b605ae Guido Trotter
    @type address: address family dependent
114 a4b605ae Guido Trotter
    @param address: address to bind the socket to
115 a4b605ae Guido Trotter

116 a4b605ae Guido Trotter
    """
117 a4b605ae Guido Trotter
    GanetiBaseAsyncoreDispatcher.__init__(self)
118 a4b605ae Guido Trotter
    self.family = family
119 a4b605ae Guido Trotter
    self.create_socket(self.family, socket.SOCK_STREAM)
120 a4b605ae Guido Trotter
    self.set_reuse_addr()
121 a4b605ae Guido Trotter
    self.bind(address)
122 a4b605ae Guido Trotter
    self.listen(self._REQUEST_QUEUE_SIZE)
123 a4b605ae Guido Trotter
124 a4b605ae Guido Trotter
  # this method is overriding an asyncore.dispatcher method
125 a4b605ae Guido Trotter
  def handle_accept(self):
126 a4b605ae Guido Trotter
    """Accept a new client connection.
127 a4b605ae Guido Trotter

128 a4b605ae Guido Trotter
    Creates a new instance of the handler class, which will use asyncore to
129 a4b605ae Guido Trotter
    serve the client.
130 a4b605ae Guido Trotter

131 a4b605ae Guido Trotter
    """
132 a4b605ae Guido Trotter
    accept_result = utils.IgnoreSignals(self.accept)
133 a4b605ae Guido Trotter
    if accept_result is not None:
134 a4b605ae Guido Trotter
      connected_socket, client_address = accept_result
135 a4b605ae Guido Trotter
      if self.family == socket.AF_UNIX:
136 a4b605ae Guido Trotter
        # override the client address, as for unix sockets nothing meaningful
137 a4b605ae Guido Trotter
        # is passed in from accept anyway
138 a744b676 Manuel Franceschini
        client_address = netutils.GetSocketCredentials(connected_socket)
139 a4b605ae Guido Trotter
      logging.info("Accepted connection from %s",
140 981732fb Manuel Franceschini
                   netutils.FormatAddress(client_address, family=self.family))
141 a4b605ae Guido Trotter
      self.handle_connection(connected_socket, client_address)
142 a4b605ae Guido Trotter
143 a4b605ae Guido Trotter
  def handle_connection(self, connected_socket, client_address):
144 a4b605ae Guido Trotter
    """Handle an already accepted connection.
145 a4b605ae Guido Trotter

146 a4b605ae Guido Trotter
    """
147 a4b605ae Guido Trotter
    raise NotImplementedError
148 a4b605ae Guido Trotter
149 a4b605ae Guido Trotter
150 b66ab629 Guido Trotter
class AsyncTerminatedMessageStream(asynchat.async_chat):
151 b66ab629 Guido Trotter
  """A terminator separated message stream asyncore module.
152 b66ab629 Guido Trotter

153 b66ab629 Guido Trotter
  Handles a stream connection receiving messages terminated by a defined
154 b66ab629 Guido Trotter
  separator. For each complete message handle_message is called.
155 b66ab629 Guido Trotter

156 b66ab629 Guido Trotter
  """
157 37e62cb9 Guido Trotter
  def __init__(self, connected_socket, peer_address, terminator, family,
158 37e62cb9 Guido Trotter
               unhandled_limit):
159 b66ab629 Guido Trotter
    """AsyncTerminatedMessageStream constructor.
160 b66ab629 Guido Trotter

161 b66ab629 Guido Trotter
    @type connected_socket: socket.socket
162 b66ab629 Guido Trotter
    @param connected_socket: connected stream socket to receive messages from
163 b66ab629 Guido Trotter
    @param peer_address: family-specific peer address
164 b66ab629 Guido Trotter
    @type terminator: string
165 b66ab629 Guido Trotter
    @param terminator: terminator separating messages in the stream
166 b66ab629 Guido Trotter
    @type family: integer
167 b66ab629 Guido Trotter
    @param family: socket family
168 37e62cb9 Guido Trotter
    @type unhandled_limit: integer or None
169 37e62cb9 Guido Trotter
    @param unhandled_limit: maximum unanswered messages
170 b66ab629 Guido Trotter

171 b66ab629 Guido Trotter
    """
172 b66ab629 Guido Trotter
    # python 2.4/2.5 uses conn=... while 2.6 has sock=... we have to cheat by
173 b66ab629 Guido Trotter
    # using a positional argument rather than a keyword one.
174 b66ab629 Guido Trotter
    asynchat.async_chat.__init__(self, connected_socket)
175 b66ab629 Guido Trotter
    self.connected_socket = connected_socket
176 b66ab629 Guido Trotter
    # on python 2.4 there is no "family" attribute for the socket class
177 b66ab629 Guido Trotter
    # FIXME: when we move to python 2.5 or above remove the family parameter
178 b66ab629 Guido Trotter
    #self.family = self.connected_socket.family
179 b66ab629 Guido Trotter
    self.family = family
180 b66ab629 Guido Trotter
    self.peer_address = peer_address
181 b66ab629 Guido Trotter
    self.terminator = terminator
182 37e62cb9 Guido Trotter
    self.unhandled_limit = unhandled_limit
183 b66ab629 Guido Trotter
    self.set_terminator(terminator)
184 b66ab629 Guido Trotter
    self.ibuffer = []
185 37e62cb9 Guido Trotter
    self.receive_count = 0
186 37e62cb9 Guido Trotter
    self.send_count = 0
187 1e063ccd Guido Trotter
    self.oqueue = collections.deque()
188 37e62cb9 Guido Trotter
    self.iqueue = collections.deque()
189 b66ab629 Guido Trotter
190 b66ab629 Guido Trotter
  # this method is overriding an asynchat.async_chat method
191 b66ab629 Guido Trotter
  def collect_incoming_data(self, data):
192 b66ab629 Guido Trotter
    self.ibuffer.append(data)
193 b66ab629 Guido Trotter
194 37e62cb9 Guido Trotter
  def _can_handle_message(self):
195 37e62cb9 Guido Trotter
    return (self.unhandled_limit is None or
196 37e62cb9 Guido Trotter
            (self.receive_count < self.send_count + self.unhandled_limit) and
197 37e62cb9 Guido Trotter
             not self.iqueue)
198 37e62cb9 Guido Trotter
199 b66ab629 Guido Trotter
  # this method is overriding an asynchat.async_chat method
200 b66ab629 Guido Trotter
  def found_terminator(self):
201 b66ab629 Guido Trotter
    message = "".join(self.ibuffer)
202 b66ab629 Guido Trotter
    self.ibuffer = []
203 37e62cb9 Guido Trotter
    message_id = self.receive_count
204 37e62cb9 Guido Trotter
    # We need to increase the receive_count after checking if the message can
205 37e62cb9 Guido Trotter
    # be handled, but before calling handle_message
206 37e62cb9 Guido Trotter
    can_handle = self._can_handle_message()
207 37e62cb9 Guido Trotter
    self.receive_count += 1
208 37e62cb9 Guido Trotter
    if can_handle:
209 37e62cb9 Guido Trotter
      self.handle_message(message, message_id)
210 37e62cb9 Guido Trotter
    else:
211 37e62cb9 Guido Trotter
      self.iqueue.append((message, message_id))
212 b66ab629 Guido Trotter
213 b66ab629 Guido Trotter
  def handle_message(self, message, message_id):
214 b66ab629 Guido Trotter
    """Handle a terminated message.
215 b66ab629 Guido Trotter

216 b66ab629 Guido Trotter
    @type message: string
217 b66ab629 Guido Trotter
    @param message: message to handle
218 b66ab629 Guido Trotter
    @type message_id: integer
219 b66ab629 Guido Trotter
    @param message_id: stream's message sequence number
220 b66ab629 Guido Trotter

221 b66ab629 Guido Trotter
    """
222 b66ab629 Guido Trotter
    pass
223 b66ab629 Guido Trotter
    # TODO: move this method to raise NotImplementedError
224 b66ab629 Guido Trotter
    # raise NotImplementedError
225 b66ab629 Guido Trotter
226 1e063ccd Guido Trotter
  def send_message(self, message):
227 1e063ccd Guido Trotter
    """Send a message to the remote peer. This function is thread-safe.
228 1e063ccd Guido Trotter

229 1e063ccd Guido Trotter
    @type message: string
230 1e063ccd Guido Trotter
    @param message: message to send, without the terminator
231 1e063ccd Guido Trotter

232 1e063ccd Guido Trotter
    @warning: If calling this function from a thread different than the one
233 1e063ccd Guido Trotter
    performing the main asyncore loop, remember that you have to wake that one
234 1e063ccd Guido Trotter
    up.
235 1e063ccd Guido Trotter

236 1e063ccd Guido Trotter
    """
237 1e063ccd Guido Trotter
    # If we just append the message we received to the output queue, this
238 1e063ccd Guido Trotter
    # function can be safely called by multiple threads at the same time, and
239 37e62cb9 Guido Trotter
    # we don't need locking, since deques are thread safe. handle_write in the
240 37e62cb9 Guido Trotter
    # asyncore thread will handle the next input message if there are any
241 37e62cb9 Guido Trotter
    # enqueued.
242 1e063ccd Guido Trotter
    self.oqueue.append(message)
243 1e063ccd Guido Trotter
244 1e063ccd Guido Trotter
  # this method is overriding an asyncore.dispatcher method
245 37e62cb9 Guido Trotter
  def readable(self):
246 37e62cb9 Guido Trotter
    # read from the socket if we can handle the next requests
247 37e62cb9 Guido Trotter
    return self._can_handle_message() and asynchat.async_chat.readable(self)
248 37e62cb9 Guido Trotter
249 37e62cb9 Guido Trotter
  # this method is overriding an asyncore.dispatcher method
250 1e063ccd Guido Trotter
  def writable(self):
251 1e063ccd Guido Trotter
    # the output queue may become full just after we called writable. This only
252 1e063ccd Guido Trotter
    # works if we know we'll have something else waking us up from the select,
253 1e063ccd Guido Trotter
    # in such case, anyway.
254 1e063ccd Guido Trotter
    return asynchat.async_chat.writable(self) or self.oqueue
255 1e063ccd Guido Trotter
256 1e063ccd Guido Trotter
  # this method is overriding an asyncore.dispatcher method
257 1e063ccd Guido Trotter
  def handle_write(self):
258 1e063ccd Guido Trotter
    if self.oqueue:
259 37e62cb9 Guido Trotter
      # if we have data in the output queue, then send_message was called.
260 37e62cb9 Guido Trotter
      # this means we can process one more message from the input queue, if
261 37e62cb9 Guido Trotter
      # there are any.
262 1e063ccd Guido Trotter
      data = self.oqueue.popleft()
263 1e063ccd Guido Trotter
      self.push(data + self.terminator)
264 37e62cb9 Guido Trotter
      self.send_count += 1
265 37e62cb9 Guido Trotter
      if self.iqueue:
266 37e62cb9 Guido Trotter
        self.handle_message(*self.iqueue.popleft())
267 1e063ccd Guido Trotter
    self.initiate_send()
268 1e063ccd Guido Trotter
269 b66ab629 Guido Trotter
  def close_log(self):
270 b66ab629 Guido Trotter
    logging.info("Closing connection from %s",
271 981732fb Manuel Franceschini
                 netutils.FormatAddress(self.peer_address, family=self.family))
272 b66ab629 Guido Trotter
    self.close()
273 b66ab629 Guido Trotter
274 b66ab629 Guido Trotter
  # this method is overriding an asyncore.dispatcher method
275 b66ab629 Guido Trotter
  def handle_expt(self):
276 b66ab629 Guido Trotter
    self.close_log()
277 b66ab629 Guido Trotter
278 b66ab629 Guido Trotter
  # this method is overriding an asyncore.dispatcher method
279 b66ab629 Guido Trotter
  def handle_error(self):
280 b66ab629 Guido Trotter
    """Log an error in handling any request, and proceed.
281 b66ab629 Guido Trotter

282 b66ab629 Guido Trotter
    """
283 b66ab629 Guido Trotter
    logging.exception("Error while handling asyncore request")
284 b66ab629 Guido Trotter
    self.close_log()
285 b66ab629 Guido Trotter
286 b66ab629 Guido Trotter
287 b11780bb Guido Trotter
class AsyncUDPSocket(GanetiBaseAsyncoreDispatcher):
288 5f3269fc Guido Trotter
  """An improved asyncore udp socket.
289 5f3269fc Guido Trotter

290 5f3269fc Guido Trotter
  """
291 d8bcfe21 Manuel Franceschini
  def __init__(self, family):
292 5f3269fc Guido Trotter
    """Constructor for AsyncUDPSocket
293 5f3269fc Guido Trotter

294 5f3269fc Guido Trotter
    """
295 b11780bb Guido Trotter
    GanetiBaseAsyncoreDispatcher.__init__(self)
296 5f3269fc Guido Trotter
    self._out_queue = []
297 d8bcfe21 Manuel Franceschini
    self._family = family
298 d8bcfe21 Manuel Franceschini
    self.create_socket(family, socket.SOCK_DGRAM)
299 5f3269fc Guido Trotter
300 5f3269fc Guido Trotter
  # this method is overriding an asyncore.dispatcher method
301 5f3269fc Guido Trotter
  def handle_connect(self):
302 5f3269fc Guido Trotter
    # Python thinks that the first udp message from a source qualifies as a
303 5f3269fc Guido Trotter
    # "connect" and further ones are part of the same connection. We beg to
304 5f3269fc Guido Trotter
    # differ and treat all messages equally.
305 5f3269fc Guido Trotter
    pass
306 5f3269fc Guido Trotter
307 5f3269fc Guido Trotter
  # this method is overriding an asyncore.dispatcher method
308 5f3269fc Guido Trotter
  def handle_read(self):
309 6e7e58b4 Guido Trotter
    recv_result = utils.IgnoreSignals(self.recvfrom,
310 6e7e58b4 Guido Trotter
                                      constants.MAX_UDP_DATA_SIZE)
311 6e7e58b4 Guido Trotter
    if recv_result is not None:
312 6e7e58b4 Guido Trotter
      payload, address = recv_result
313 d8bcfe21 Manuel Franceschini
      if self._family == socket.AF_INET6:
314 d8bcfe21 Manuel Franceschini
        # we ignore 'flow info' and 'scope id' as we don't need them
315 d8bcfe21 Manuel Franceschini
        ip, port, _, _ = address
316 d8bcfe21 Manuel Franceschini
      else:
317 d8bcfe21 Manuel Franceschini
        ip, port = address
318 d8bcfe21 Manuel Franceschini
319 6e7e58b4 Guido Trotter
      self.handle_datagram(payload, ip, port)
320 5f3269fc Guido Trotter
321 5f3269fc Guido Trotter
  def handle_datagram(self, payload, ip, port):
322 5f3269fc Guido Trotter
    """Handle an already read udp datagram
323 5f3269fc Guido Trotter

324 5f3269fc Guido Trotter
    """
325 5f3269fc Guido Trotter
    raise NotImplementedError
326 5f3269fc Guido Trotter
327 5f3269fc Guido Trotter
  # this method is overriding an asyncore.dispatcher method
328 5f3269fc Guido Trotter
  def writable(self):
329 5f3269fc Guido Trotter
    # We should check whether we can write to the socket only if we have
330 5f3269fc Guido Trotter
    # something scheduled to be written
331 5f3269fc Guido Trotter
    return bool(self._out_queue)
332 5f3269fc Guido Trotter
333 48bf6352 Guido Trotter
  # this method is overriding an asyncore.dispatcher method
334 5f3269fc Guido Trotter
  def handle_write(self):
335 3660fcf5 Guido Trotter
    if not self._out_queue:
336 3660fcf5 Guido Trotter
      logging.error("handle_write called with empty output queue")
337 3660fcf5 Guido Trotter
      return
338 3660fcf5 Guido Trotter
    (ip, port, payload) = self._out_queue[0]
339 232144d0 Guido Trotter
    utils.IgnoreSignals(self.sendto, payload, 0, (ip, port))
340 3660fcf5 Guido Trotter
    self._out_queue.pop(0)
341 3660fcf5 Guido Trotter
342 5f3269fc Guido Trotter
  def enqueue_send(self, ip, port, payload):
343 5f3269fc Guido Trotter
    """Enqueue a datagram to be sent when possible
344 5f3269fc Guido Trotter

345 5f3269fc Guido Trotter
    """
346 c8eded0b Guido Trotter
    if len(payload) > constants.MAX_UDP_DATA_SIZE:
347 c8eded0b Guido Trotter
      raise errors.UdpDataSizeError('Packet too big: %s > %s' % (len(payload),
348 c8eded0b Guido Trotter
                                    constants.MAX_UDP_DATA_SIZE))
349 5f3269fc Guido Trotter
    self._out_queue.append((ip, port, payload))
350 5f3269fc Guido Trotter
351 6ddf5c8f Guido Trotter
  def process_next_packet(self, timeout=0):
352 6ddf5c8f Guido Trotter
    """Process the next datagram, waiting for it if necessary.
353 6ddf5c8f Guido Trotter

354 6ddf5c8f Guido Trotter
    @type timeout: float
355 6ddf5c8f Guido Trotter
    @param timeout: how long to wait for data
356 6ddf5c8f Guido Trotter
    @rtype: boolean
357 6ddf5c8f Guido Trotter
    @return: True if some data has been handled, False otherwise
358 6ddf5c8f Guido Trotter

359 6ddf5c8f Guido Trotter
    """
360 1b429e2a Iustin Pop
    result = utils.WaitForFdCondition(self, select.POLLIN, timeout)
361 1b429e2a Iustin Pop
    if result is not None and result & select.POLLIN:
362 3660fcf5 Guido Trotter
      self.handle_read()
363 6ddf5c8f Guido Trotter
      return True
364 6ddf5c8f Guido Trotter
    else:
365 6ddf5c8f Guido Trotter
      return False
366 6ddf5c8f Guido Trotter
367 5f3269fc Guido Trotter
368 495ba852 Guido Trotter
class AsyncAwaker(GanetiBaseAsyncoreDispatcher):
369 495ba852 Guido Trotter
  """A way to notify the asyncore loop that something is going on.
370 495ba852 Guido Trotter

371 495ba852 Guido Trotter
  If an asyncore daemon is multithreaded when a thread tries to push some data
372 495ba852 Guido Trotter
  to a socket, the main loop handling asynchronous requests might be sleeping
373 495ba852 Guido Trotter
  waiting on a select(). To avoid this it can create an instance of the
374 495ba852 Guido Trotter
  AsyncAwaker, which other threads can use to wake it up.
375 495ba852 Guido Trotter

376 495ba852 Guido Trotter
  """
377 495ba852 Guido Trotter
  def __init__(self, signal_fn=None):
378 495ba852 Guido Trotter
    """Constructor for AsyncAwaker
379 495ba852 Guido Trotter

380 495ba852 Guido Trotter
    @type signal_fn: function
381 495ba852 Guido Trotter
    @param signal_fn: function to call when awaken
382 495ba852 Guido Trotter

383 495ba852 Guido Trotter
    """
384 495ba852 Guido Trotter
    GanetiBaseAsyncoreDispatcher.__init__(self)
385 495ba852 Guido Trotter
    assert signal_fn == None or callable(signal_fn)
386 495ba852 Guido Trotter
    (self.in_socket, self.out_socket) = socket.socketpair(socket.AF_UNIX,
387 495ba852 Guido Trotter
                                                          socket.SOCK_STREAM)
388 495ba852 Guido Trotter
    self.in_socket.setblocking(0)
389 b628191f Guido Trotter
    self.in_socket.shutdown(socket.SHUT_WR)
390 b628191f Guido Trotter
    self.out_socket.shutdown(socket.SHUT_RD)
391 495ba852 Guido Trotter
    self.set_socket(self.in_socket)
392 495ba852 Guido Trotter
    self.need_signal = True
393 495ba852 Guido Trotter
    self.signal_fn = signal_fn
394 495ba852 Guido Trotter
    self.connected = True
395 495ba852 Guido Trotter
396 495ba852 Guido Trotter
  # this method is overriding an asyncore.dispatcher method
397 495ba852 Guido Trotter
  def handle_read(self):
398 495ba852 Guido Trotter
    utils.IgnoreSignals(self.recv, 4096)
399 495ba852 Guido Trotter
    if self.signal_fn:
400 495ba852 Guido Trotter
      self.signal_fn()
401 495ba852 Guido Trotter
    self.need_signal = True
402 495ba852 Guido Trotter
403 495ba852 Guido Trotter
  # this method is overriding an asyncore.dispatcher method
404 495ba852 Guido Trotter
  def close(self):
405 495ba852 Guido Trotter
    asyncore.dispatcher.close(self)
406 495ba852 Guido Trotter
    self.out_socket.close()
407 495ba852 Guido Trotter
408 495ba852 Guido Trotter
  def signal(self):
409 495ba852 Guido Trotter
    """Signal the asyncore main loop.
410 495ba852 Guido Trotter

411 495ba852 Guido Trotter
    Any data we send here will be ignored, but it will cause the select() call
412 495ba852 Guido Trotter
    to return.
413 495ba852 Guido Trotter

414 495ba852 Guido Trotter
    """
415 495ba852 Guido Trotter
    # Yes, there is a race condition here. No, we don't care, at worst we're
416 495ba852 Guido Trotter
    # sending more than one wakeup token, which doesn't harm at all.
417 495ba852 Guido Trotter
    if self.need_signal:
418 495ba852 Guido Trotter
      self.need_signal = False
419 495ba852 Guido Trotter
      self.out_socket.send("\0")
420 495ba852 Guido Trotter
421 495ba852 Guido Trotter
422 821d9e43 Michael Hanselmann
class Mainloop(object):
423 821d9e43 Michael Hanselmann
  """Generic mainloop for daemons
424 821d9e43 Michael Hanselmann

425 69b99987 Michael Hanselmann
  @ivar scheduler: A sched.scheduler object, which can be used to register
426 69b99987 Michael Hanselmann
    timed events
427 69b99987 Michael Hanselmann

428 821d9e43 Michael Hanselmann
  """
429 821d9e43 Michael Hanselmann
  def __init__(self):
430 b14b975f Michael Hanselmann
    """Constructs a new Mainloop instance.
431 b14b975f Michael Hanselmann

432 b14b975f Michael Hanselmann
    """
433 821d9e43 Michael Hanselmann
    self._signal_wait = []
434 a02b89cf Guido Trotter
    self.scheduler = AsyncoreScheduler(time.time)
435 821d9e43 Michael Hanselmann
436 9b739173 Guido Trotter
  @utils.SignalHandled([signal.SIGCHLD])
437 9b739173 Guido Trotter
  @utils.SignalHandled([signal.SIGTERM])
438 f59dce3e Guido Trotter
  @utils.SignalHandled([signal.SIGINT])
439 69b99987 Michael Hanselmann
  def Run(self, signal_handlers=None):
440 b14b975f Michael Hanselmann
    """Runs the mainloop.
441 b14b975f Michael Hanselmann

442 9b739173 Guido Trotter
    @type signal_handlers: dict
443 9b739173 Guido Trotter
    @param signal_handlers: signal->L{utils.SignalHandler} passed by decorator
444 b14b975f Michael Hanselmann

445 b14b975f Michael Hanselmann
    """
446 9b739173 Guido Trotter
    assert isinstance(signal_handlers, dict) and \
447 9b739173 Guido Trotter
           len(signal_handlers) > 0, \
448 9b739173 Guido Trotter
           "Broken SignalHandled decorator"
449 9b739173 Guido Trotter
    running = True
450 9b739173 Guido Trotter
    # Start actual main loop
451 9b739173 Guido Trotter
    while running:
452 a02b89cf Guido Trotter
      if not self.scheduler.empty():
453 a02b89cf Guido Trotter
        try:
454 a02b89cf Guido Trotter
          self.scheduler.run()
455 a02b89cf Guido Trotter
        except SchedulerBreakout:
456 a02b89cf Guido Trotter
          pass
457 a02b89cf Guido Trotter
      else:
458 a02b89cf Guido Trotter
        asyncore.loop(count=1, use_poll=True)
459 9b739173 Guido Trotter
460 9b739173 Guido Trotter
      # Check whether a signal was raised
461 9b739173 Guido Trotter
      for sig in signal_handlers:
462 9b739173 Guido Trotter
        handler = signal_handlers[sig]
463 9b739173 Guido Trotter
        if handler.called:
464 9b739173 Guido Trotter
          self._CallSignalWaiters(sig)
465 f59dce3e Guido Trotter
          running = sig not in (signal.SIGTERM, signal.SIGINT)
466 9b739173 Guido Trotter
          handler.Clear()
467 a570e2a8 Guido Trotter
468 b14b975f Michael Hanselmann
  def _CallSignalWaiters(self, signum):
469 b14b975f Michael Hanselmann
    """Calls all signal waiters for a certain signal.
470 b14b975f Michael Hanselmann

471 b14b975f Michael Hanselmann
    @type signum: int
472 b14b975f Michael Hanselmann
    @param signum: Signal number
473 b14b975f Michael Hanselmann

474 b14b975f Michael Hanselmann
    """
475 b14b975f Michael Hanselmann
    for owner in self._signal_wait:
476 a9fe7232 Guido Trotter
      owner.OnSignal(signum)
477 821d9e43 Michael Hanselmann
478 821d9e43 Michael Hanselmann
  def RegisterSignal(self, owner):
479 821d9e43 Michael Hanselmann
    """Registers a receiver for signal notifications
480 821d9e43 Michael Hanselmann

481 821d9e43 Michael Hanselmann
    The receiver must support a "OnSignal(self, signum)" function.
482 821d9e43 Michael Hanselmann

483 821d9e43 Michael Hanselmann
    @type owner: instance
484 821d9e43 Michael Hanselmann
    @param owner: Receiver
485 821d9e43 Michael Hanselmann

486 821d9e43 Michael Hanselmann
    """
487 821d9e43 Michael Hanselmann
    self._signal_wait.append(owner)
488 b11c9e5c Michael Hanselmann
489 04ccf5e9 Guido Trotter
490 fd346851 René Nussbaumer
def GenericMain(daemon_name, optionparser, check_fn, exec_fn,
491 1c54156d Luca Bigliardi
                multithreaded=False, console_logging=False,
492 0070a462 René Nussbaumer
                default_ssl_cert=None, default_ssl_key=None):
493 04ccf5e9 Guido Trotter
  """Shared main function for daemons.
494 04ccf5e9 Guido Trotter

495 04ccf5e9 Guido Trotter
  @type daemon_name: string
496 04ccf5e9 Guido Trotter
  @param daemon_name: daemon name
497 69b99987 Michael Hanselmann
  @type optionparser: optparse.OptionParser
498 04ccf5e9 Guido Trotter
  @param optionparser: initialized optionparser with daemon-specific options
499 04ccf5e9 Guido Trotter
                       (common -f -d options will be handled by this module)
500 04ccf5e9 Guido Trotter
  @type check_fn: function which accepts (options, args)
501 04ccf5e9 Guido Trotter
  @param check_fn: function that checks start conditions and exits if they're
502 04ccf5e9 Guido Trotter
                   not met
503 04ccf5e9 Guido Trotter
  @type exec_fn: function which accepts (options, args)
504 04ccf5e9 Guido Trotter
  @param exec_fn: function that's executed with the daemon's pid file held, and
505 04ccf5e9 Guido Trotter
                  runs the daemon itself.
506 30dabd03 Michael Hanselmann
  @type multithreaded: bool
507 30dabd03 Michael Hanselmann
  @param multithreaded: Whether the daemon uses threads
508 ff917534 Luca Bigliardi
  @type console_logging: boolean
509 ff917534 Luca Bigliardi
  @param console_logging: if True, the daemon will fall back to the system
510 ff917534 Luca Bigliardi
                          console if logging fails
511 0648750e Michael Hanselmann
  @type default_ssl_cert: string
512 0648750e Michael Hanselmann
  @param default_ssl_cert: Default SSL certificate path
513 0648750e Michael Hanselmann
  @type default_ssl_key: string
514 0648750e Michael Hanselmann
  @param default_ssl_key: Default SSL key path
515 04ccf5e9 Guido Trotter

516 04ccf5e9 Guido Trotter
  """
517 04ccf5e9 Guido Trotter
  optionparser.add_option("-f", "--foreground", dest="fork",
518 04ccf5e9 Guido Trotter
                          help="Don't detach from the current terminal",
519 04ccf5e9 Guido Trotter
                          default=True, action="store_false")
520 04ccf5e9 Guido Trotter
  optionparser.add_option("-d", "--debug", dest="debug",
521 04ccf5e9 Guido Trotter
                          help="Enable some debug messages",
522 04ccf5e9 Guido Trotter
                          default=False, action="store_true")
523 551b6283 Iustin Pop
  optionparser.add_option("--syslog", dest="syslog",
524 551b6283 Iustin Pop
                          help="Enable logging to syslog (except debug"
525 551b6283 Iustin Pop
                          " messages); one of 'no', 'yes' or 'only' [%s]" %
526 551b6283 Iustin Pop
                          constants.SYSLOG_USAGE,
527 551b6283 Iustin Pop
                          default=constants.SYSLOG_USAGE,
528 551b6283 Iustin Pop
                          choices=["no", "yes", "only"])
529 0a71aa17 Michael Hanselmann
530 04ccf5e9 Guido Trotter
  if daemon_name in constants.DAEMONS_PORTS:
531 14f5f1b6 Manuel Franceschini
    default_bind_address = constants.IP4_ADDRESS_ANY
532 7dd999fc Manuel Franceschini
    family = ssconf.SimpleStore().GetPrimaryIPFamily()
533 7dd999fc Manuel Franceschini
    # family will default to AF_INET if there is no ssconf file (e.g. when
534 7dd999fc Manuel Franceschini
    # upgrading a cluster from 2.2 -> 2.3. This is intended, as Ganeti clusters
535 7dd999fc Manuel Franceschini
    # <= 2.2 can not be AF_INET6
536 7dd999fc Manuel Franceschini
    if family == netutils.IP6Address.family:
537 7dd999fc Manuel Franceschini
      default_bind_address = constants.IP6_ADDRESS_ANY
538 e7323b5e Manuel Franceschini
539 a744b676 Manuel Franceschini
    default_port = netutils.GetDaemonPort(daemon_name)
540 0a71aa17 Michael Hanselmann
541 0a71aa17 Michael Hanselmann
    # For networked daemons we allow choosing the port and bind address
542 04ccf5e9 Guido Trotter
    optionparser.add_option("-p", "--port", dest="port",
543 0a71aa17 Michael Hanselmann
                            help="Network port (default: %s)" % default_port,
544 0a71aa17 Michael Hanselmann
                            default=default_port, type="int")
545 04ccf5e9 Guido Trotter
    optionparser.add_option("-b", "--bind", dest="bind_address",
546 e7323b5e Manuel Franceschini
                            help=("Bind address (default: '%s')" %
547 0a71aa17 Michael Hanselmann
                                  default_bind_address),
548 0a71aa17 Michael Hanselmann
                            default=default_bind_address, metavar="ADDRESS")
549 04ccf5e9 Guido Trotter
550 0648750e Michael Hanselmann
  if default_ssl_key is not None and default_ssl_cert is not None:
551 3b1b0cb6 Guido Trotter
    optionparser.add_option("--no-ssl", dest="ssl",
552 3b1b0cb6 Guido Trotter
                            help="Do not secure HTTP protocol with SSL",
553 3b1b0cb6 Guido Trotter
                            default=True, action="store_false")
554 3b1b0cb6 Guido Trotter
    optionparser.add_option("-K", "--ssl-key", dest="ssl_key",
555 0648750e Michael Hanselmann
                            help=("SSL key path (default: %s)" %
556 0648750e Michael Hanselmann
                                  default_ssl_key),
557 0648750e Michael Hanselmann
                            default=default_ssl_key, type="string",
558 0648750e Michael Hanselmann
                            metavar="SSL_KEY_PATH")
559 3b1b0cb6 Guido Trotter
    optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert",
560 0648750e Michael Hanselmann
                            help=("SSL certificate path (default: %s)" %
561 0648750e Michael Hanselmann
                                  default_ssl_cert),
562 0648750e Michael Hanselmann
                            default=default_ssl_cert, type="string",
563 0648750e Michael Hanselmann
                            metavar="SSL_CERT_PATH")
564 3b1b0cb6 Guido Trotter
565 30dabd03 Michael Hanselmann
  # Disable the use of fork(2) if the daemon uses threads
566 30dabd03 Michael Hanselmann
  utils.no_fork = multithreaded
567 04ccf5e9 Guido Trotter
568 04ccf5e9 Guido Trotter
  options, args = optionparser.parse_args()
569 04ccf5e9 Guido Trotter
570 0648750e Michael Hanselmann
  if getattr(options, "ssl", False):
571 0648750e Michael Hanselmann
    ssl_paths = {
572 0648750e Michael Hanselmann
      "certificate": options.ssl_cert,
573 0648750e Michael Hanselmann
      "key": options.ssl_key,
574 0648750e Michael Hanselmann
      }
575 0648750e Michael Hanselmann
576 0648750e Michael Hanselmann
    for name, path in ssl_paths.iteritems():
577 0648750e Michael Hanselmann
      if not os.path.isfile(path):
578 0648750e Michael Hanselmann
        print >> sys.stderr, "SSL %s file '%s' was not found" % (name, path)
579 3b1b0cb6 Guido Trotter
        sys.exit(constants.EXIT_FAILURE)
580 3b1b0cb6 Guido Trotter
581 0648750e Michael Hanselmann
    # TODO: By initiating http.HttpSslParams here we would only read the files
582 0648750e Michael Hanselmann
    # once and have a proper validation (isfile returns False on directories)
583 0648750e Michael Hanselmann
    # at the same time.
584 0648750e Michael Hanselmann
585 3b1b0cb6 Guido Trotter
  if check_fn is not None:
586 3b1b0cb6 Guido Trotter
    check_fn(options, args)
587 3b1b0cb6 Guido Trotter
588 04ccf5e9 Guido Trotter
  if options.fork:
589 04ccf5e9 Guido Trotter
    utils.CloseFDs()
590 0070a462 René Nussbaumer
    utils.Daemonize(logfile=constants.DAEMONS_LOGFILES[daemon_name])
591 04ccf5e9 Guido Trotter
592 04ccf5e9 Guido Trotter
  utils.WritePidFile(daemon_name)
593 04ccf5e9 Guido Trotter
  try:
594 04ccf5e9 Guido Trotter
    utils.SetupLogging(logfile=constants.DAEMONS_LOGFILES[daemon_name],
595 04ccf5e9 Guido Trotter
                       debug=options.debug,
596 04ccf5e9 Guido Trotter
                       stderr_logging=not options.fork,
597 e7307f08 Michael Hanselmann
                       multithreaded=multithreaded,
598 551b6283 Iustin Pop
                       program=daemon_name,
599 ff917534 Luca Bigliardi
                       syslog=options.syslog,
600 ff917534 Luca Bigliardi
                       console_logging=console_logging)
601 099c52ad Iustin Pop
    logging.info("%s daemon startup", daemon_name)
602 04ccf5e9 Guido Trotter
    exec_fn(options, args)
603 04ccf5e9 Guido Trotter
  finally:
604 04ccf5e9 Guido Trotter
    utils.RemovePidFile(daemon_name)