Statistics
| Branch: | Tag: | Revision:

root / lib / daemon.py @ 66e884e1

History | View | Annotate | Download (23.1 kB)

1 821d9e43 Michael Hanselmann
#
2 821d9e43 Michael Hanselmann
#
3 821d9e43 Michael Hanselmann
4 1a8337f2 Manuel Franceschini
# Copyright (C) 2006, 2007, 2008, 2010 Google Inc.
5 821d9e43 Michael Hanselmann
#
6 821d9e43 Michael Hanselmann
# This program is free software; you can redistribute it and/or modify
7 821d9e43 Michael Hanselmann
# it under the terms of the GNU General Public License as published by
8 821d9e43 Michael Hanselmann
# the Free Software Foundation; either version 2 of the License, or
9 821d9e43 Michael Hanselmann
# (at your option) any later version.
10 821d9e43 Michael Hanselmann
#
11 821d9e43 Michael Hanselmann
# This program is distributed in the hope that it will be useful, but
12 821d9e43 Michael Hanselmann
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 821d9e43 Michael Hanselmann
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 821d9e43 Michael Hanselmann
# General Public License for more details.
15 821d9e43 Michael Hanselmann
#
16 821d9e43 Michael Hanselmann
# You should have received a copy of the GNU General Public License
17 821d9e43 Michael Hanselmann
# along with this program; if not, write to the Free Software
18 821d9e43 Michael Hanselmann
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 821d9e43 Michael Hanselmann
# 02110-1301, USA.
20 821d9e43 Michael Hanselmann
21 821d9e43 Michael Hanselmann
22 821d9e43 Michael Hanselmann
"""Module with helper classes and functions for daemons"""
23 821d9e43 Michael Hanselmann
24 821d9e43 Michael Hanselmann
25 112d240d Guido Trotter
import asyncore
26 b66ab629 Guido Trotter
import asynchat
27 1e063ccd Guido Trotter
import collections
28 3b1b0cb6 Guido Trotter
import os
29 821d9e43 Michael Hanselmann
import signal
30 04ccf5e9 Guido Trotter
import logging
31 a02b89cf Guido Trotter
import sched
32 a02b89cf Guido Trotter
import time
33 5f3269fc Guido Trotter
import socket
34 6ddf5c8f Guido Trotter
import select
35 c124045f Iustin Pop
import sys
36 821d9e43 Michael Hanselmann
37 821d9e43 Michael Hanselmann
from ganeti import utils
38 04ccf5e9 Guido Trotter
from ganeti import constants
39 a02b89cf Guido Trotter
from ganeti import errors
40 a744b676 Manuel Franceschini
from ganeti import netutils
41 e7323b5e Manuel Franceschini
from ganeti import ssconf
42 f4ec2960 René Nussbaumer
from ganeti import runtime
43 a02b89cf Guido Trotter
44 a02b89cf Guido Trotter
45 a02b89cf Guido Trotter
class SchedulerBreakout(Exception):
46 a02b89cf Guido Trotter
  """Exception used to get out of the scheduler loop
47 a02b89cf Guido Trotter

48 a02b89cf Guido Trotter
  """
49 a02b89cf Guido Trotter
50 a02b89cf Guido Trotter
51 a02b89cf Guido Trotter
def AsyncoreDelayFunction(timeout):
52 a02b89cf Guido Trotter
  """Asyncore-compatible scheduler delay function.
53 a02b89cf Guido Trotter

54 a02b89cf Guido Trotter
  This is a delay function for sched that, rather than actually sleeping,
55 a02b89cf Guido Trotter
  executes asyncore events happening in the meantime.
56 a02b89cf Guido Trotter

57 a02b89cf Guido Trotter
  After an event has occurred, rather than returning, it raises a
58 a02b89cf Guido Trotter
  SchedulerBreakout exception, which will force the current scheduler.run()
59 a02b89cf Guido Trotter
  invocation to terminate, so that we can also check for signals. The main loop
60 a02b89cf Guido Trotter
  will then call the scheduler run again, which will allow it to actually
61 a02b89cf Guido Trotter
  process any due events.
62 a02b89cf Guido Trotter

63 a02b89cf Guido Trotter
  This is needed because scheduler.run() doesn't support a count=..., as
64 a02b89cf Guido Trotter
  asyncore loop, and the scheduler module documents throwing exceptions from
65 a02b89cf Guido Trotter
  inside the delay function as an allowed usage model.
66 a02b89cf Guido Trotter

67 a02b89cf Guido Trotter
  """
68 a02b89cf Guido Trotter
  asyncore.loop(timeout=timeout, count=1, use_poll=True)
69 a02b89cf Guido Trotter
  raise SchedulerBreakout()
70 a02b89cf Guido Trotter
71 a02b89cf Guido Trotter
72 a02b89cf Guido Trotter
class AsyncoreScheduler(sched.scheduler):
73 a02b89cf Guido Trotter
  """Event scheduler integrated with asyncore
74 a02b89cf Guido Trotter

75 a02b89cf Guido Trotter
  """
76 a02b89cf Guido Trotter
  def __init__(self, timefunc):
77 a02b89cf Guido Trotter
    sched.scheduler.__init__(self, timefunc, AsyncoreDelayFunction)
78 821d9e43 Michael Hanselmann
79 821d9e43 Michael Hanselmann
80 b11780bb Guido Trotter
class GanetiBaseAsyncoreDispatcher(asyncore.dispatcher):
81 b11780bb Guido Trotter
  """Base Ganeti Asyncore Dispacher
82 b11780bb Guido Trotter

83 b11780bb Guido Trotter
  """
84 b11780bb Guido Trotter
  # this method is overriding an asyncore.dispatcher method
85 b11780bb Guido Trotter
  def handle_error(self):
86 b11780bb Guido Trotter
    """Log an error in handling any request, and proceed.
87 b11780bb Guido Trotter

88 b11780bb Guido Trotter
    """
89 b11780bb Guido Trotter
    logging.exception("Error while handling asyncore request")
90 b11780bb Guido Trotter
91 b11780bb Guido Trotter
  # this method is overriding an asyncore.dispatcher method
92 b11780bb Guido Trotter
  def writable(self):
93 b11780bb Guido Trotter
    """Most of the time we don't want to check for writability.
94 b11780bb Guido Trotter

95 b11780bb Guido Trotter
    """
96 b11780bb Guido Trotter
    return False
97 b11780bb Guido Trotter
98 b11780bb Guido Trotter
99 a4b605ae Guido Trotter
class AsyncStreamServer(GanetiBaseAsyncoreDispatcher):
100 a4b605ae Guido Trotter
  """A stream server to use with asyncore.
101 a4b605ae Guido Trotter

102 a4b605ae Guido Trotter
  Each request is accepted, and then dispatched to a separate asyncore
103 a4b605ae Guido Trotter
  dispatcher to handle.
104 a4b605ae Guido Trotter

105 a4b605ae Guido Trotter
  """
106 a4b605ae Guido Trotter
107 a4b605ae Guido Trotter
  _REQUEST_QUEUE_SIZE = 5
108 a4b605ae Guido Trotter
109 a4b605ae Guido Trotter
  def __init__(self, family, address):
110 a4b605ae Guido Trotter
    """Constructor for AsyncUnixStreamSocket
111 a4b605ae Guido Trotter

112 a4b605ae Guido Trotter
    @type family: integer
113 a4b605ae Guido Trotter
    @param family: socket family (one of socket.AF_*)
114 a4b605ae Guido Trotter
    @type address: address family dependent
115 a4b605ae Guido Trotter
    @param address: address to bind the socket to
116 a4b605ae Guido Trotter

117 a4b605ae Guido Trotter
    """
118 a4b605ae Guido Trotter
    GanetiBaseAsyncoreDispatcher.__init__(self)
119 a4b605ae Guido Trotter
    self.family = family
120 a4b605ae Guido Trotter
    self.create_socket(self.family, socket.SOCK_STREAM)
121 a4b605ae Guido Trotter
    self.set_reuse_addr()
122 a4b605ae Guido Trotter
    self.bind(address)
123 a4b605ae Guido Trotter
    self.listen(self._REQUEST_QUEUE_SIZE)
124 a4b605ae Guido Trotter
125 a4b605ae Guido Trotter
  # this method is overriding an asyncore.dispatcher method
126 a4b605ae Guido Trotter
  def handle_accept(self):
127 a4b605ae Guido Trotter
    """Accept a new client connection.
128 a4b605ae Guido Trotter

129 a4b605ae Guido Trotter
    Creates a new instance of the handler class, which will use asyncore to
130 a4b605ae Guido Trotter
    serve the client.
131 a4b605ae Guido Trotter

132 a4b605ae Guido Trotter
    """
133 a4b605ae Guido Trotter
    accept_result = utils.IgnoreSignals(self.accept)
134 a4b605ae Guido Trotter
    if accept_result is not None:
135 a4b605ae Guido Trotter
      connected_socket, client_address = accept_result
136 a4b605ae Guido Trotter
      if self.family == socket.AF_UNIX:
137 a4b605ae Guido Trotter
        # override the client address, as for unix sockets nothing meaningful
138 a4b605ae Guido Trotter
        # is passed in from accept anyway
139 a744b676 Manuel Franceschini
        client_address = netutils.GetSocketCredentials(connected_socket)
140 a4b605ae Guido Trotter
      logging.info("Accepted connection from %s",
141 981732fb Manuel Franceschini
                   netutils.FormatAddress(client_address, family=self.family))
142 a4b605ae Guido Trotter
      self.handle_connection(connected_socket, client_address)
143 a4b605ae Guido Trotter
144 a4b605ae Guido Trotter
  def handle_connection(self, connected_socket, client_address):
145 a4b605ae Guido Trotter
    """Handle an already accepted connection.
146 a4b605ae Guido Trotter

147 a4b605ae Guido Trotter
    """
148 a4b605ae Guido Trotter
    raise NotImplementedError
149 a4b605ae Guido Trotter
150 a4b605ae Guido Trotter
151 b66ab629 Guido Trotter
class AsyncTerminatedMessageStream(asynchat.async_chat):
152 b66ab629 Guido Trotter
  """A terminator separated message stream asyncore module.
153 b66ab629 Guido Trotter

154 b66ab629 Guido Trotter
  Handles a stream connection receiving messages terminated by a defined
155 b66ab629 Guido Trotter
  separator. For each complete message handle_message is called.
156 b66ab629 Guido Trotter

157 b66ab629 Guido Trotter
  """
158 37e62cb9 Guido Trotter
  def __init__(self, connected_socket, peer_address, terminator, family,
159 37e62cb9 Guido Trotter
               unhandled_limit):
160 b66ab629 Guido Trotter
    """AsyncTerminatedMessageStream constructor.
161 b66ab629 Guido Trotter

162 b66ab629 Guido Trotter
    @type connected_socket: socket.socket
163 b66ab629 Guido Trotter
    @param connected_socket: connected stream socket to receive messages from
164 b66ab629 Guido Trotter
    @param peer_address: family-specific peer address
165 b66ab629 Guido Trotter
    @type terminator: string
166 b66ab629 Guido Trotter
    @param terminator: terminator separating messages in the stream
167 b66ab629 Guido Trotter
    @type family: integer
168 b66ab629 Guido Trotter
    @param family: socket family
169 37e62cb9 Guido Trotter
    @type unhandled_limit: integer or None
170 37e62cb9 Guido Trotter
    @param unhandled_limit: maximum unanswered messages
171 b66ab629 Guido Trotter

172 b66ab629 Guido Trotter
    """
173 b66ab629 Guido Trotter
    # python 2.4/2.5 uses conn=... while 2.6 has sock=... we have to cheat by
174 b66ab629 Guido Trotter
    # using a positional argument rather than a keyword one.
175 b66ab629 Guido Trotter
    asynchat.async_chat.__init__(self, connected_socket)
176 b66ab629 Guido Trotter
    self.connected_socket = connected_socket
177 b66ab629 Guido Trotter
    # on python 2.4 there is no "family" attribute for the socket class
178 b66ab629 Guido Trotter
    # FIXME: when we move to python 2.5 or above remove the family parameter
179 b66ab629 Guido Trotter
    #self.family = self.connected_socket.family
180 b66ab629 Guido Trotter
    self.family = family
181 b66ab629 Guido Trotter
    self.peer_address = peer_address
182 b66ab629 Guido Trotter
    self.terminator = terminator
183 37e62cb9 Guido Trotter
    self.unhandled_limit = unhandled_limit
184 b66ab629 Guido Trotter
    self.set_terminator(terminator)
185 b66ab629 Guido Trotter
    self.ibuffer = []
186 37e62cb9 Guido Trotter
    self.receive_count = 0
187 37e62cb9 Guido Trotter
    self.send_count = 0
188 1e063ccd Guido Trotter
    self.oqueue = collections.deque()
189 37e62cb9 Guido Trotter
    self.iqueue = collections.deque()
190 b66ab629 Guido Trotter
191 b66ab629 Guido Trotter
  # this method is overriding an asynchat.async_chat method
192 b66ab629 Guido Trotter
  def collect_incoming_data(self, data):
193 b66ab629 Guido Trotter
    self.ibuffer.append(data)
194 b66ab629 Guido Trotter
195 37e62cb9 Guido Trotter
  def _can_handle_message(self):
196 37e62cb9 Guido Trotter
    return (self.unhandled_limit is None or
197 37e62cb9 Guido Trotter
            (self.receive_count < self.send_count + self.unhandled_limit) and
198 37e62cb9 Guido Trotter
             not self.iqueue)
199 37e62cb9 Guido Trotter
200 b66ab629 Guido Trotter
  # this method is overriding an asynchat.async_chat method
201 b66ab629 Guido Trotter
  def found_terminator(self):
202 b66ab629 Guido Trotter
    message = "".join(self.ibuffer)
203 b66ab629 Guido Trotter
    self.ibuffer = []
204 37e62cb9 Guido Trotter
    message_id = self.receive_count
205 37e62cb9 Guido Trotter
    # We need to increase the receive_count after checking if the message can
206 37e62cb9 Guido Trotter
    # be handled, but before calling handle_message
207 37e62cb9 Guido Trotter
    can_handle = self._can_handle_message()
208 37e62cb9 Guido Trotter
    self.receive_count += 1
209 37e62cb9 Guido Trotter
    if can_handle:
210 37e62cb9 Guido Trotter
      self.handle_message(message, message_id)
211 37e62cb9 Guido Trotter
    else:
212 37e62cb9 Guido Trotter
      self.iqueue.append((message, message_id))
213 b66ab629 Guido Trotter
214 b66ab629 Guido Trotter
  def handle_message(self, message, message_id):
215 b66ab629 Guido Trotter
    """Handle a terminated message.
216 b66ab629 Guido Trotter

217 b66ab629 Guido Trotter
    @type message: string
218 b66ab629 Guido Trotter
    @param message: message to handle
219 b66ab629 Guido Trotter
    @type message_id: integer
220 b66ab629 Guido Trotter
    @param message_id: stream's message sequence number
221 b66ab629 Guido Trotter

222 b66ab629 Guido Trotter
    """
223 b66ab629 Guido Trotter
    pass
224 b66ab629 Guido Trotter
    # TODO: move this method to raise NotImplementedError
225 b66ab629 Guido Trotter
    # raise NotImplementedError
226 b66ab629 Guido Trotter
227 1e063ccd Guido Trotter
  def send_message(self, message):
228 1e063ccd Guido Trotter
    """Send a message to the remote peer. This function is thread-safe.
229 1e063ccd Guido Trotter

230 1e063ccd Guido Trotter
    @type message: string
231 1e063ccd Guido Trotter
    @param message: message to send, without the terminator
232 1e063ccd Guido Trotter

233 1e063ccd Guido Trotter
    @warning: If calling this function from a thread different than the one
234 1e063ccd Guido Trotter
    performing the main asyncore loop, remember that you have to wake that one
235 1e063ccd Guido Trotter
    up.
236 1e063ccd Guido Trotter

237 1e063ccd Guido Trotter
    """
238 1e063ccd Guido Trotter
    # If we just append the message we received to the output queue, this
239 1e063ccd Guido Trotter
    # function can be safely called by multiple threads at the same time, and
240 37e62cb9 Guido Trotter
    # we don't need locking, since deques are thread safe. handle_write in the
241 37e62cb9 Guido Trotter
    # asyncore thread will handle the next input message if there are any
242 37e62cb9 Guido Trotter
    # enqueued.
243 1e063ccd Guido Trotter
    self.oqueue.append(message)
244 1e063ccd Guido Trotter
245 1e063ccd Guido Trotter
  # this method is overriding an asyncore.dispatcher method
246 37e62cb9 Guido Trotter
  def readable(self):
247 37e62cb9 Guido Trotter
    # read from the socket if we can handle the next requests
248 37e62cb9 Guido Trotter
    return self._can_handle_message() and asynchat.async_chat.readable(self)
249 37e62cb9 Guido Trotter
250 37e62cb9 Guido Trotter
  # this method is overriding an asyncore.dispatcher method
251 1e063ccd Guido Trotter
  def writable(self):
252 1e063ccd Guido Trotter
    # the output queue may become full just after we called writable. This only
253 1e063ccd Guido Trotter
    # works if we know we'll have something else waking us up from the select,
254 1e063ccd Guido Trotter
    # in such case, anyway.
255 1e063ccd Guido Trotter
    return asynchat.async_chat.writable(self) or self.oqueue
256 1e063ccd Guido Trotter
257 1e063ccd Guido Trotter
  # this method is overriding an asyncore.dispatcher method
258 1e063ccd Guido Trotter
  def handle_write(self):
259 1e063ccd Guido Trotter
    if self.oqueue:
260 37e62cb9 Guido Trotter
      # if we have data in the output queue, then send_message was called.
261 37e62cb9 Guido Trotter
      # this means we can process one more message from the input queue, if
262 37e62cb9 Guido Trotter
      # there are any.
263 1e063ccd Guido Trotter
      data = self.oqueue.popleft()
264 1e063ccd Guido Trotter
      self.push(data + self.terminator)
265 37e62cb9 Guido Trotter
      self.send_count += 1
266 37e62cb9 Guido Trotter
      if self.iqueue:
267 37e62cb9 Guido Trotter
        self.handle_message(*self.iqueue.popleft())
268 1e063ccd Guido Trotter
    self.initiate_send()
269 1e063ccd Guido Trotter
270 b66ab629 Guido Trotter
  def close_log(self):
271 b66ab629 Guido Trotter
    logging.info("Closing connection from %s",
272 981732fb Manuel Franceschini
                 netutils.FormatAddress(self.peer_address, family=self.family))
273 b66ab629 Guido Trotter
    self.close()
274 b66ab629 Guido Trotter
275 b66ab629 Guido Trotter
  # this method is overriding an asyncore.dispatcher method
276 b66ab629 Guido Trotter
  def handle_expt(self):
277 b66ab629 Guido Trotter
    self.close_log()
278 b66ab629 Guido Trotter
279 b66ab629 Guido Trotter
  # this method is overriding an asyncore.dispatcher method
280 b66ab629 Guido Trotter
  def handle_error(self):
281 b66ab629 Guido Trotter
    """Log an error in handling any request, and proceed.
282 b66ab629 Guido Trotter

283 b66ab629 Guido Trotter
    """
284 b66ab629 Guido Trotter
    logging.exception("Error while handling asyncore request")
285 b66ab629 Guido Trotter
    self.close_log()
286 b66ab629 Guido Trotter
287 b66ab629 Guido Trotter
288 b11780bb Guido Trotter
class AsyncUDPSocket(GanetiBaseAsyncoreDispatcher):
289 5f3269fc Guido Trotter
  """An improved asyncore udp socket.
290 5f3269fc Guido Trotter

291 5f3269fc Guido Trotter
  """
292 d8bcfe21 Manuel Franceschini
  def __init__(self, family):
293 5f3269fc Guido Trotter
    """Constructor for AsyncUDPSocket
294 5f3269fc Guido Trotter

295 5f3269fc Guido Trotter
    """
296 b11780bb Guido Trotter
    GanetiBaseAsyncoreDispatcher.__init__(self)
297 5f3269fc Guido Trotter
    self._out_queue = []
298 d8bcfe21 Manuel Franceschini
    self._family = family
299 d8bcfe21 Manuel Franceschini
    self.create_socket(family, socket.SOCK_DGRAM)
300 5f3269fc Guido Trotter
301 5f3269fc Guido Trotter
  # this method is overriding an asyncore.dispatcher method
302 5f3269fc Guido Trotter
  def handle_connect(self):
303 5f3269fc Guido Trotter
    # Python thinks that the first udp message from a source qualifies as a
304 5f3269fc Guido Trotter
    # "connect" and further ones are part of the same connection. We beg to
305 5f3269fc Guido Trotter
    # differ and treat all messages equally.
306 5f3269fc Guido Trotter
    pass
307 5f3269fc Guido Trotter
308 5f3269fc Guido Trotter
  # this method is overriding an asyncore.dispatcher method
309 5f3269fc Guido Trotter
  def handle_read(self):
310 6e7e58b4 Guido Trotter
    recv_result = utils.IgnoreSignals(self.recvfrom,
311 6e7e58b4 Guido Trotter
                                      constants.MAX_UDP_DATA_SIZE)
312 6e7e58b4 Guido Trotter
    if recv_result is not None:
313 6e7e58b4 Guido Trotter
      payload, address = recv_result
314 d8bcfe21 Manuel Franceschini
      if self._family == socket.AF_INET6:
315 d8bcfe21 Manuel Franceschini
        # we ignore 'flow info' and 'scope id' as we don't need them
316 d8bcfe21 Manuel Franceschini
        ip, port, _, _ = address
317 d8bcfe21 Manuel Franceschini
      else:
318 d8bcfe21 Manuel Franceschini
        ip, port = address
319 d8bcfe21 Manuel Franceschini
320 6e7e58b4 Guido Trotter
      self.handle_datagram(payload, ip, port)
321 5f3269fc Guido Trotter
322 5f3269fc Guido Trotter
  def handle_datagram(self, payload, ip, port):
323 5f3269fc Guido Trotter
    """Handle an already read udp datagram
324 5f3269fc Guido Trotter

325 5f3269fc Guido Trotter
    """
326 5f3269fc Guido Trotter
    raise NotImplementedError
327 5f3269fc Guido Trotter
328 5f3269fc Guido Trotter
  # this method is overriding an asyncore.dispatcher method
329 5f3269fc Guido Trotter
  def writable(self):
330 5f3269fc Guido Trotter
    # We should check whether we can write to the socket only if we have
331 5f3269fc Guido Trotter
    # something scheduled to be written
332 5f3269fc Guido Trotter
    return bool(self._out_queue)
333 5f3269fc Guido Trotter
334 48bf6352 Guido Trotter
  # this method is overriding an asyncore.dispatcher method
335 5f3269fc Guido Trotter
  def handle_write(self):
336 3660fcf5 Guido Trotter
    if not self._out_queue:
337 3660fcf5 Guido Trotter
      logging.error("handle_write called with empty output queue")
338 3660fcf5 Guido Trotter
      return
339 3660fcf5 Guido Trotter
    (ip, port, payload) = self._out_queue[0]
340 232144d0 Guido Trotter
    utils.IgnoreSignals(self.sendto, payload, 0, (ip, port))
341 3660fcf5 Guido Trotter
    self._out_queue.pop(0)
342 3660fcf5 Guido Trotter
343 5f3269fc Guido Trotter
  def enqueue_send(self, ip, port, payload):
344 5f3269fc Guido Trotter
    """Enqueue a datagram to be sent when possible
345 5f3269fc Guido Trotter

346 5f3269fc Guido Trotter
    """
347 c8eded0b Guido Trotter
    if len(payload) > constants.MAX_UDP_DATA_SIZE:
348 c8eded0b Guido Trotter
      raise errors.UdpDataSizeError('Packet too big: %s > %s' % (len(payload),
349 c8eded0b Guido Trotter
                                    constants.MAX_UDP_DATA_SIZE))
350 5f3269fc Guido Trotter
    self._out_queue.append((ip, port, payload))
351 5f3269fc Guido Trotter
352 6ddf5c8f Guido Trotter
  def process_next_packet(self, timeout=0):
353 6ddf5c8f Guido Trotter
    """Process the next datagram, waiting for it if necessary.
354 6ddf5c8f Guido Trotter

355 6ddf5c8f Guido Trotter
    @type timeout: float
356 6ddf5c8f Guido Trotter
    @param timeout: how long to wait for data
357 6ddf5c8f Guido Trotter
    @rtype: boolean
358 6ddf5c8f Guido Trotter
    @return: True if some data has been handled, False otherwise
359 6ddf5c8f Guido Trotter

360 6ddf5c8f Guido Trotter
    """
361 1b429e2a Iustin Pop
    result = utils.WaitForFdCondition(self, select.POLLIN, timeout)
362 1b429e2a Iustin Pop
    if result is not None and result & select.POLLIN:
363 3660fcf5 Guido Trotter
      self.handle_read()
364 6ddf5c8f Guido Trotter
      return True
365 6ddf5c8f Guido Trotter
    else:
366 6ddf5c8f Guido Trotter
      return False
367 6ddf5c8f Guido Trotter
368 5f3269fc Guido Trotter
369 495ba852 Guido Trotter
class AsyncAwaker(GanetiBaseAsyncoreDispatcher):
370 495ba852 Guido Trotter
  """A way to notify the asyncore loop that something is going on.
371 495ba852 Guido Trotter

372 495ba852 Guido Trotter
  If an asyncore daemon is multithreaded when a thread tries to push some data
373 495ba852 Guido Trotter
  to a socket, the main loop handling asynchronous requests might be sleeping
374 495ba852 Guido Trotter
  waiting on a select(). To avoid this it can create an instance of the
375 495ba852 Guido Trotter
  AsyncAwaker, which other threads can use to wake it up.
376 495ba852 Guido Trotter

377 495ba852 Guido Trotter
  """
378 495ba852 Guido Trotter
  def __init__(self, signal_fn=None):
379 495ba852 Guido Trotter
    """Constructor for AsyncAwaker
380 495ba852 Guido Trotter

381 495ba852 Guido Trotter
    @type signal_fn: function
382 495ba852 Guido Trotter
    @param signal_fn: function to call when awaken
383 495ba852 Guido Trotter

384 495ba852 Guido Trotter
    """
385 495ba852 Guido Trotter
    GanetiBaseAsyncoreDispatcher.__init__(self)
386 495ba852 Guido Trotter
    assert signal_fn == None or callable(signal_fn)
387 495ba852 Guido Trotter
    (self.in_socket, self.out_socket) = socket.socketpair(socket.AF_UNIX,
388 495ba852 Guido Trotter
                                                          socket.SOCK_STREAM)
389 495ba852 Guido Trotter
    self.in_socket.setblocking(0)
390 b628191f Guido Trotter
    self.in_socket.shutdown(socket.SHUT_WR)
391 b628191f Guido Trotter
    self.out_socket.shutdown(socket.SHUT_RD)
392 495ba852 Guido Trotter
    self.set_socket(self.in_socket)
393 495ba852 Guido Trotter
    self.need_signal = True
394 495ba852 Guido Trotter
    self.signal_fn = signal_fn
395 495ba852 Guido Trotter
    self.connected = True
396 495ba852 Guido Trotter
397 495ba852 Guido Trotter
  # this method is overriding an asyncore.dispatcher method
398 495ba852 Guido Trotter
  def handle_read(self):
399 495ba852 Guido Trotter
    utils.IgnoreSignals(self.recv, 4096)
400 495ba852 Guido Trotter
    if self.signal_fn:
401 495ba852 Guido Trotter
      self.signal_fn()
402 495ba852 Guido Trotter
    self.need_signal = True
403 495ba852 Guido Trotter
404 495ba852 Guido Trotter
  # this method is overriding an asyncore.dispatcher method
405 495ba852 Guido Trotter
  def close(self):
406 495ba852 Guido Trotter
    asyncore.dispatcher.close(self)
407 495ba852 Guido Trotter
    self.out_socket.close()
408 495ba852 Guido Trotter
409 495ba852 Guido Trotter
  def signal(self):
410 495ba852 Guido Trotter
    """Signal the asyncore main loop.
411 495ba852 Guido Trotter

412 495ba852 Guido Trotter
    Any data we send here will be ignored, but it will cause the select() call
413 495ba852 Guido Trotter
    to return.
414 495ba852 Guido Trotter

415 495ba852 Guido Trotter
    """
416 495ba852 Guido Trotter
    # Yes, there is a race condition here. No, we don't care, at worst we're
417 495ba852 Guido Trotter
    # sending more than one wakeup token, which doesn't harm at all.
418 495ba852 Guido Trotter
    if self.need_signal:
419 495ba852 Guido Trotter
      self.need_signal = False
420 495ba852 Guido Trotter
      self.out_socket.send("\0")
421 495ba852 Guido Trotter
422 495ba852 Guido Trotter
423 821d9e43 Michael Hanselmann
class Mainloop(object):
424 821d9e43 Michael Hanselmann
  """Generic mainloop for daemons
425 821d9e43 Michael Hanselmann

426 69b99987 Michael Hanselmann
  @ivar scheduler: A sched.scheduler object, which can be used to register
427 69b99987 Michael Hanselmann
    timed events
428 69b99987 Michael Hanselmann

429 821d9e43 Michael Hanselmann
  """
430 821d9e43 Michael Hanselmann
  def __init__(self):
431 b14b975f Michael Hanselmann
    """Constructs a new Mainloop instance.
432 b14b975f Michael Hanselmann

433 b14b975f Michael Hanselmann
    """
434 821d9e43 Michael Hanselmann
    self._signal_wait = []
435 a02b89cf Guido Trotter
    self.scheduler = AsyncoreScheduler(time.time)
436 821d9e43 Michael Hanselmann
437 9b739173 Guido Trotter
  @utils.SignalHandled([signal.SIGCHLD])
438 9b739173 Guido Trotter
  @utils.SignalHandled([signal.SIGTERM])
439 f59dce3e Guido Trotter
  @utils.SignalHandled([signal.SIGINT])
440 69b99987 Michael Hanselmann
  def Run(self, signal_handlers=None):
441 b14b975f Michael Hanselmann
    """Runs the mainloop.
442 b14b975f Michael Hanselmann

443 9b739173 Guido Trotter
    @type signal_handlers: dict
444 9b739173 Guido Trotter
    @param signal_handlers: signal->L{utils.SignalHandler} passed by decorator
445 b14b975f Michael Hanselmann

446 b14b975f Michael Hanselmann
    """
447 9b739173 Guido Trotter
    assert isinstance(signal_handlers, dict) and \
448 9b739173 Guido Trotter
           len(signal_handlers) > 0, \
449 9b739173 Guido Trotter
           "Broken SignalHandled decorator"
450 9b739173 Guido Trotter
    running = True
451 9b739173 Guido Trotter
    # Start actual main loop
452 9b739173 Guido Trotter
    while running:
453 a02b89cf Guido Trotter
      if not self.scheduler.empty():
454 a02b89cf Guido Trotter
        try:
455 a02b89cf Guido Trotter
          self.scheduler.run()
456 a02b89cf Guido Trotter
        except SchedulerBreakout:
457 a02b89cf Guido Trotter
          pass
458 a02b89cf Guido Trotter
      else:
459 a02b89cf Guido Trotter
        asyncore.loop(count=1, use_poll=True)
460 9b739173 Guido Trotter
461 9b739173 Guido Trotter
      # Check whether a signal was raised
462 9b739173 Guido Trotter
      for sig in signal_handlers:
463 9b739173 Guido Trotter
        handler = signal_handlers[sig]
464 9b739173 Guido Trotter
        if handler.called:
465 9b739173 Guido Trotter
          self._CallSignalWaiters(sig)
466 f59dce3e Guido Trotter
          running = sig not in (signal.SIGTERM, signal.SIGINT)
467 9b739173 Guido Trotter
          handler.Clear()
468 a570e2a8 Guido Trotter
469 b14b975f Michael Hanselmann
  def _CallSignalWaiters(self, signum):
470 b14b975f Michael Hanselmann
    """Calls all signal waiters for a certain signal.
471 b14b975f Michael Hanselmann

472 b14b975f Michael Hanselmann
    @type signum: int
473 b14b975f Michael Hanselmann
    @param signum: Signal number
474 b14b975f Michael Hanselmann

475 b14b975f Michael Hanselmann
    """
476 b14b975f Michael Hanselmann
    for owner in self._signal_wait:
477 a9fe7232 Guido Trotter
      owner.OnSignal(signum)
478 821d9e43 Michael Hanselmann
479 821d9e43 Michael Hanselmann
  def RegisterSignal(self, owner):
480 821d9e43 Michael Hanselmann
    """Registers a receiver for signal notifications
481 821d9e43 Michael Hanselmann

482 821d9e43 Michael Hanselmann
    The receiver must support a "OnSignal(self, signum)" function.
483 821d9e43 Michael Hanselmann

484 821d9e43 Michael Hanselmann
    @type owner: instance
485 821d9e43 Michael Hanselmann
    @param owner: Receiver
486 821d9e43 Michael Hanselmann

487 821d9e43 Michael Hanselmann
    """
488 821d9e43 Michael Hanselmann
    self._signal_wait.append(owner)
489 b11c9e5c Michael Hanselmann
490 04ccf5e9 Guido Trotter
491 f4ec2960 René Nussbaumer
def _VerifyDaemonUser(daemon_name):
492 f4ec2960 René Nussbaumer
  """Verifies the process uid matches the configured uid.
493 f4ec2960 René Nussbaumer

494 3e87c1bf Iustin Pop
  This method verifies that a daemon is started as the user it is
495 3e87c1bf Iustin Pop
  intended to be run
496 f4ec2960 René Nussbaumer

497 f4ec2960 René Nussbaumer
  @param daemon_name: The name of daemon to be started
498 f4ec2960 René Nussbaumer
  @return: A tuple with the first item indicating success or not,
499 f4ec2960 René Nussbaumer
           the second item current uid and third with expected uid
500 f4ec2960 René Nussbaumer

501 f4ec2960 René Nussbaumer
  """
502 f4ec2960 René Nussbaumer
  getents = runtime.GetEnts()
503 f4ec2960 René Nussbaumer
  running_uid = os.getuid()
504 f4ec2960 René Nussbaumer
  daemon_uids = {
505 f4ec2960 René Nussbaumer
    constants.MASTERD: getents.masterd_uid,
506 f4ec2960 René Nussbaumer
    constants.RAPI: getents.rapi_uid,
507 f4ec2960 René Nussbaumer
    constants.NODED: getents.noded_uid,
508 f4ec2960 René Nussbaumer
    constants.CONFD: getents.confd_uid,
509 f4ec2960 René Nussbaumer
    }
510 f4ec2960 René Nussbaumer
511 f4ec2960 René Nussbaumer
  return (daemon_uids[daemon_name] == running_uid, running_uid,
512 f4ec2960 René Nussbaumer
          daemon_uids[daemon_name])
513 f4ec2960 René Nussbaumer
514 f4ec2960 René Nussbaumer
515 3e87c1bf Iustin Pop
def _BeautifyError(err):
516 3e87c1bf Iustin Pop
  """Try to format an error better.
517 3e87c1bf Iustin Pop

518 3e87c1bf Iustin Pop
  Since we're dealing with daemon startup errors, in many cases this
519 3e87c1bf Iustin Pop
  will be due to socket error and such, so we try to format these cases better.
520 3e87c1bf Iustin Pop

521 3e87c1bf Iustin Pop
  @param err: an exception object
522 3e87c1bf Iustin Pop
  @rtype: string
523 3e87c1bf Iustin Pop
  @return: the formatted error description
524 3e87c1bf Iustin Pop

525 3e87c1bf Iustin Pop
  """
526 3e87c1bf Iustin Pop
  try:
527 3e87c1bf Iustin Pop
    if isinstance(err, socket.error):
528 3e87c1bf Iustin Pop
      return "Socket-related error: %s (errno=%s)" % (err.args[1], err.args[0])
529 3e87c1bf Iustin Pop
    elif isinstance(err, EnvironmentError):
530 3e87c1bf Iustin Pop
      if err.filename is None:
531 3e87c1bf Iustin Pop
        return "%s (errno=%s)" % (err.strerror, err.errno)
532 3e87c1bf Iustin Pop
      else:
533 3e87c1bf Iustin Pop
        return "%s (file %s) (errno=%s)" % (err.strerror, err.filename,
534 3e87c1bf Iustin Pop
                                            err.errno)
535 3e87c1bf Iustin Pop
    else:
536 3e87c1bf Iustin Pop
      return str(err)
537 3e87c1bf Iustin Pop
  except Exception: # pylint: disable-msg=W0703
538 3e87c1bf Iustin Pop
    logging.exception("Error while handling existing error %s", err)
539 3e87c1bf Iustin Pop
    return "%s" % str(err)
540 3e87c1bf Iustin Pop
541 3e87c1bf Iustin Pop
542 b42ea9ed Iustin Pop
def GenericMain(daemon_name, optionparser,
543 b42ea9ed Iustin Pop
                check_fn, prepare_fn, exec_fn,
544 1c54156d Luca Bigliardi
                multithreaded=False, console_logging=False,
545 0070a462 René Nussbaumer
                default_ssl_cert=None, default_ssl_key=None):
546 04ccf5e9 Guido Trotter
  """Shared main function for daemons.
547 04ccf5e9 Guido Trotter

548 04ccf5e9 Guido Trotter
  @type daemon_name: string
549 04ccf5e9 Guido Trotter
  @param daemon_name: daemon name
550 69b99987 Michael Hanselmann
  @type optionparser: optparse.OptionParser
551 04ccf5e9 Guido Trotter
  @param optionparser: initialized optionparser with daemon-specific options
552 04ccf5e9 Guido Trotter
                       (common -f -d options will be handled by this module)
553 04ccf5e9 Guido Trotter
  @type check_fn: function which accepts (options, args)
554 04ccf5e9 Guido Trotter
  @param check_fn: function that checks start conditions and exits if they're
555 04ccf5e9 Guido Trotter
                   not met
556 b42ea9ed Iustin Pop
  @type prepare_fn: function which accepts (options, args)
557 b42ea9ed Iustin Pop
  @param prepare_fn: function that is run before forking, or None;
558 b42ea9ed Iustin Pop
      it's result will be passed as the third parameter to exec_fn, or
559 b42ea9ed Iustin Pop
      if None was passed in, we will just pass None to exec_fn
560 b42ea9ed Iustin Pop
  @type exec_fn: function which accepts (options, args, prepare_results)
561 04ccf5e9 Guido Trotter
  @param exec_fn: function that's executed with the daemon's pid file held, and
562 04ccf5e9 Guido Trotter
                  runs the daemon itself.
563 30dabd03 Michael Hanselmann
  @type multithreaded: bool
564 30dabd03 Michael Hanselmann
  @param multithreaded: Whether the daemon uses threads
565 ff917534 Luca Bigliardi
  @type console_logging: boolean
566 ff917534 Luca Bigliardi
  @param console_logging: if True, the daemon will fall back to the system
567 ff917534 Luca Bigliardi
                          console if logging fails
568 0648750e Michael Hanselmann
  @type default_ssl_cert: string
569 0648750e Michael Hanselmann
  @param default_ssl_cert: Default SSL certificate path
570 0648750e Michael Hanselmann
  @type default_ssl_key: string
571 0648750e Michael Hanselmann
  @param default_ssl_key: Default SSL key path
572 04ccf5e9 Guido Trotter

573 04ccf5e9 Guido Trotter
  """
574 04ccf5e9 Guido Trotter
  optionparser.add_option("-f", "--foreground", dest="fork",
575 04ccf5e9 Guido Trotter
                          help="Don't detach from the current terminal",
576 04ccf5e9 Guido Trotter
                          default=True, action="store_false")
577 04ccf5e9 Guido Trotter
  optionparser.add_option("-d", "--debug", dest="debug",
578 04ccf5e9 Guido Trotter
                          help="Enable some debug messages",
579 04ccf5e9 Guido Trotter
                          default=False, action="store_true")
580 551b6283 Iustin Pop
  optionparser.add_option("--syslog", dest="syslog",
581 551b6283 Iustin Pop
                          help="Enable logging to syslog (except debug"
582 551b6283 Iustin Pop
                          " messages); one of 'no', 'yes' or 'only' [%s]" %
583 551b6283 Iustin Pop
                          constants.SYSLOG_USAGE,
584 551b6283 Iustin Pop
                          default=constants.SYSLOG_USAGE,
585 551b6283 Iustin Pop
                          choices=["no", "yes", "only"])
586 0a71aa17 Michael Hanselmann
587 04ccf5e9 Guido Trotter
  if daemon_name in constants.DAEMONS_PORTS:
588 14f5f1b6 Manuel Franceschini
    default_bind_address = constants.IP4_ADDRESS_ANY
589 7dd999fc Manuel Franceschini
    family = ssconf.SimpleStore().GetPrimaryIPFamily()
590 7dd999fc Manuel Franceschini
    # family will default to AF_INET if there is no ssconf file (e.g. when
591 7dd999fc Manuel Franceschini
    # upgrading a cluster from 2.2 -> 2.3. This is intended, as Ganeti clusters
592 7dd999fc Manuel Franceschini
    # <= 2.2 can not be AF_INET6
593 7dd999fc Manuel Franceschini
    if family == netutils.IP6Address.family:
594 7dd999fc Manuel Franceschini
      default_bind_address = constants.IP6_ADDRESS_ANY
595 e7323b5e Manuel Franceschini
596 a744b676 Manuel Franceschini
    default_port = netutils.GetDaemonPort(daemon_name)
597 0a71aa17 Michael Hanselmann
598 0a71aa17 Michael Hanselmann
    # For networked daemons we allow choosing the port and bind address
599 04ccf5e9 Guido Trotter
    optionparser.add_option("-p", "--port", dest="port",
600 0a71aa17 Michael Hanselmann
                            help="Network port (default: %s)" % default_port,
601 0a71aa17 Michael Hanselmann
                            default=default_port, type="int")
602 04ccf5e9 Guido Trotter
    optionparser.add_option("-b", "--bind", dest="bind_address",
603 e7323b5e Manuel Franceschini
                            help=("Bind address (default: '%s')" %
604 0a71aa17 Michael Hanselmann
                                  default_bind_address),
605 0a71aa17 Michael Hanselmann
                            default=default_bind_address, metavar="ADDRESS")
606 04ccf5e9 Guido Trotter
607 0648750e Michael Hanselmann
  if default_ssl_key is not None and default_ssl_cert is not None:
608 3b1b0cb6 Guido Trotter
    optionparser.add_option("--no-ssl", dest="ssl",
609 3b1b0cb6 Guido Trotter
                            help="Do not secure HTTP protocol with SSL",
610 3b1b0cb6 Guido Trotter
                            default=True, action="store_false")
611 3b1b0cb6 Guido Trotter
    optionparser.add_option("-K", "--ssl-key", dest="ssl_key",
612 0648750e Michael Hanselmann
                            help=("SSL key path (default: %s)" %
613 0648750e Michael Hanselmann
                                  default_ssl_key),
614 0648750e Michael Hanselmann
                            default=default_ssl_key, type="string",
615 0648750e Michael Hanselmann
                            metavar="SSL_KEY_PATH")
616 3b1b0cb6 Guido Trotter
    optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert",
617 0648750e Michael Hanselmann
                            help=("SSL certificate path (default: %s)" %
618 0648750e Michael Hanselmann
                                  default_ssl_cert),
619 0648750e Michael Hanselmann
                            default=default_ssl_cert, type="string",
620 0648750e Michael Hanselmann
                            metavar="SSL_CERT_PATH")
621 3b1b0cb6 Guido Trotter
622 30dabd03 Michael Hanselmann
  # Disable the use of fork(2) if the daemon uses threads
623 30dabd03 Michael Hanselmann
  utils.no_fork = multithreaded
624 04ccf5e9 Guido Trotter
625 04ccf5e9 Guido Trotter
  options, args = optionparser.parse_args()
626 04ccf5e9 Guido Trotter
627 0648750e Michael Hanselmann
  if getattr(options, "ssl", False):
628 0648750e Michael Hanselmann
    ssl_paths = {
629 0648750e Michael Hanselmann
      "certificate": options.ssl_cert,
630 0648750e Michael Hanselmann
      "key": options.ssl_key,
631 0648750e Michael Hanselmann
      }
632 0648750e Michael Hanselmann
633 0648750e Michael Hanselmann
    for name, path in ssl_paths.iteritems():
634 0648750e Michael Hanselmann
      if not os.path.isfile(path):
635 0648750e Michael Hanselmann
        print >> sys.stderr, "SSL %s file '%s' was not found" % (name, path)
636 3b1b0cb6 Guido Trotter
        sys.exit(constants.EXIT_FAILURE)
637 3b1b0cb6 Guido Trotter
638 0648750e Michael Hanselmann
    # TODO: By initiating http.HttpSslParams here we would only read the files
639 0648750e Michael Hanselmann
    # once and have a proper validation (isfile returns False on directories)
640 0648750e Michael Hanselmann
    # at the same time.
641 0648750e Michael Hanselmann
642 f4ec2960 René Nussbaumer
  result, running_uid, expected_uid = _VerifyDaemonUser(daemon_name)
643 f4ec2960 René Nussbaumer
  if not result:
644 f4ec2960 René Nussbaumer
    msg = ("%s started using wrong user ID (%d), expected %d" %
645 f4ec2960 René Nussbaumer
           (daemon_name, running_uid, expected_uid))
646 f4ec2960 René Nussbaumer
    print >> sys.stderr, msg
647 f4ec2960 René Nussbaumer
    sys.exit(constants.EXIT_FAILURE)
648 f4ec2960 René Nussbaumer
649 3b1b0cb6 Guido Trotter
  if check_fn is not None:
650 3b1b0cb6 Guido Trotter
    check_fn(options, args)
651 3b1b0cb6 Guido Trotter
652 04ccf5e9 Guido Trotter
  if options.fork:
653 04ccf5e9 Guido Trotter
    utils.CloseFDs()
654 b78aa8c2 Iustin Pop
    wpipe = utils.Daemonize(logfile=constants.DAEMONS_LOGFILES[daemon_name])
655 b78aa8c2 Iustin Pop
  else:
656 b78aa8c2 Iustin Pop
    wpipe = None
657 04ccf5e9 Guido Trotter
658 5c4d37f9 Iustin Pop
  utils.WritePidFile(utils.DaemonPidFileName(daemon_name))
659 04ccf5e9 Guido Trotter
  try:
660 b78aa8c2 Iustin Pop
    try:
661 b78aa8c2 Iustin Pop
      utils.SetupLogging(logfile=constants.DAEMONS_LOGFILES[daemon_name],
662 b78aa8c2 Iustin Pop
                         debug=options.debug,
663 b78aa8c2 Iustin Pop
                         stderr_logging=not options.fork,
664 b78aa8c2 Iustin Pop
                         multithreaded=multithreaded,
665 b78aa8c2 Iustin Pop
                         program=daemon_name,
666 b78aa8c2 Iustin Pop
                         syslog=options.syslog,
667 b78aa8c2 Iustin Pop
                         console_logging=console_logging)
668 b78aa8c2 Iustin Pop
      if callable(prepare_fn):
669 b78aa8c2 Iustin Pop
        prep_results = prepare_fn(options, args)
670 b78aa8c2 Iustin Pop
      else:
671 b78aa8c2 Iustin Pop
        prep_results = None
672 b78aa8c2 Iustin Pop
      logging.info("%s daemon startup", daemon_name)
673 b78aa8c2 Iustin Pop
    except Exception, err:
674 ed3920e3 Iustin Pop
      utils.WriteErrorToFD(wpipe, _BeautifyError(err))
675 b78aa8c2 Iustin Pop
      raise
676 b78aa8c2 Iustin Pop
677 b78aa8c2 Iustin Pop
    if wpipe is not None:
678 b78aa8c2 Iustin Pop
      # we're done with the preparation phase, we close the pipe to
679 b78aa8c2 Iustin Pop
      # let the parent know it's safe to exit
680 b78aa8c2 Iustin Pop
      os.close(wpipe)
681 b42ea9ed Iustin Pop
682 b42ea9ed Iustin Pop
    exec_fn(options, args, prep_results)
683 04ccf5e9 Guido Trotter
  finally:
684 04ccf5e9 Guido Trotter
    utils.RemovePidFile(daemon_name)