Modify cli.JobExecutor to use SubmitManyJobs
[ganeti-local] / lib / daemon.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module with helper classes and functions for daemons"""
23
24
25 import select
26 import signal
27 import errno
28 import time
29
30 from ganeti import utils
31
32
33 class Timer(object):
34   def __init__(self, owner, timer_id, start, interval, repeat):
35     self.owner = owner
36     self.timer_id = timer_id
37     self.start = start
38     self.interval = interval
39     self.repeat = repeat
40
41
42 class Mainloop(object):
43   """Generic mainloop for daemons
44
45   """
46   def __init__(self):
47     """Constructs a new Mainloop instance.
48
49     """
50     self._io_wait = {}
51     self._io_wait_add = []
52     self._io_wait_remove = []
53     self._signal_wait = []
54     self._timer_id_last = 0
55     self._timer = {}
56     self._timer_add = []
57     self._timer_remove = []
58
59   def Run(self, handle_sigchld=True, handle_sigterm=True, stop_on_empty=False):
60     """Runs the mainloop.
61
62     @type handle_sigchld: bool
63     @param handle_sigchld: Whether to install handler for SIGCHLD
64     @type handle_sigterm: bool
65     @param handle_sigterm: Whether to install handler for SIGTERM
66     @type stop_on_empty: bool
67     @param stop_on_empty: Whether to stop mainloop once all I/O waiters
68                           unregistered
69
70     """
71     poller = select.poll()
72
73     # Setup signal handlers
74     if handle_sigchld:
75       sigchld_handler = utils.SignalHandler([signal.SIGCHLD])
76     else:
77       sigchld_handler = None
78     try:
79       if handle_sigterm:
80         sigterm_handler = utils.SignalHandler([signal.SIGTERM])
81       else:
82         sigterm_handler = None
83
84       try:
85         running = True
86         timeout = None
87         timeout_needs_update = True
88
89         # Start actual main loop
90         while running:
91           # Entries could be added again afterwards, hence removing first
92           if self._io_wait_remove:
93             for fd in self._io_wait_remove:
94               try:
95                 poller.unregister(fd)
96               except KeyError:
97                 pass
98               try:
99                 del self._io_wait[fd]
100               except KeyError:
101                 pass
102             self._io_wait_remove = []
103
104           # Add new entries
105           if self._io_wait_add:
106             for (owner, fd, conditions) in self._io_wait_add:
107               self._io_wait[fd] = owner
108               poller.register(fd, conditions)
109             self._io_wait_add = []
110
111           # Add new timers
112           if self._timer_add:
113             timeout_needs_update = True
114             for timer in self._timer_add:
115               self._timer[timer.timer_id] = timer
116             del self._timer_add[:]
117
118           # Remove timers
119           if self._timer_remove:
120             timeout_needs_update = True
121             for timer_id in self._timer_remove:
122               try:
123                 del self._timer[timer_id]
124               except KeyError:
125                 pass
126             del self._timer_remove[:]
127
128           # Stop if nothing is listening anymore
129           if stop_on_empty and not (self._io_wait or self._timer):
130             break
131
132           # Calculate timeout again if required
133           if timeout_needs_update:
134             timeout = self._CalcTimeout(time.time())
135             timeout_needs_update = False
136
137           # Wait for I/O events
138           try:
139             io_events = poller.poll(timeout)
140           except select.error, err:
141             # EINTR can happen when signals are sent
142             if err.args and err.args[0] in (errno.EINTR,):
143               io_events = None
144             else:
145               raise
146
147           after_poll = time.time()
148
149           if io_events:
150             # Check for I/O events
151             for (evfd, evcond) in io_events:
152               owner = self._io_wait.get(evfd, None)
153               if owner:
154                 owner.OnIO(evfd, evcond)
155
156           if self._timer:
157             self._CheckTimers(after_poll)
158
159           # Check whether signal was raised
160           if sigchld_handler and sigchld_handler.called:
161             self._CallSignalWaiters(signal.SIGCHLD)
162             sigchld_handler.Clear()
163
164           if sigterm_handler and sigterm_handler.called:
165             self._CallSignalWaiters(signal.SIGTERM)
166             running = False
167             sigterm_handler.Clear()
168       finally:
169         # Restore signal handlers
170         if sigterm_handler:
171           sigterm_handler.Reset()
172     finally:
173       if sigchld_handler:
174         sigchld_handler.Reset()
175
176   def _CalcTimeout(self, now):
177     if not self._timer:
178       return None
179
180     timeout = None
181
182     # TODO: Repeating timers
183
184     min_timeout = 0.001
185
186     for timer in self._timer.itervalues():
187       time_left = (timer.start + timer.interval) - now
188       if timeout is None or time_left < timeout:
189         timeout = time_left
190       if timeout < 0:
191         timeout = 0
192         break
193       elif timeout < min_timeout:
194         timeout = min_timeout
195         break
196
197     return timeout * 1000.0
198
199   def _CheckTimers(self, now):
200     # TODO: Repeating timers
201     for timer in self._timer.itervalues():
202       if now < (timer.start + timer.interval):
203         continue
204
205       timer.owner.OnTimer(timer.timer_id)
206
207       # TODO: Repeating timers should not be removed
208       self._timer_remove.append(timer.timer_id)
209
210   def _CallSignalWaiters(self, signum):
211     """Calls all signal waiters for a certain signal.
212
213     @type signum: int
214     @param signum: Signal number
215
216     """
217     for owner in self._signal_wait:
218       owner.OnSignal(signal.SIGCHLD)
219
220   def RegisterIO(self, owner, fd, condition):
221     """Registers a receiver for I/O notifications
222
223     The receiver must support a "OnIO(self, fd, conditions)" function.
224
225     @type owner: instance
226     @param owner: Receiver
227     @type fd: int
228     @param fd: File descriptor
229     @type condition: int
230     @param condition: ORed field of conditions to be notified
231                       (see select module)
232
233     """
234     # select.Poller also supports file() like objects, but we don't.
235     assert isinstance(fd, (int, long)), \
236       "Only integers are supported for file descriptors"
237
238     self._io_wait_add.append((owner, fd, condition))
239
240   def UnregisterIO(self, fd):
241     """Unregister a file descriptor.
242
243     It'll be unregistered the next time the mainloop checks for it.
244
245     @type fd: int
246     @param fd: File descriptor
247
248     """
249     # select.Poller also supports file() like objects, but we don't.
250     assert isinstance(fd, (int, long)), \
251       "Only integers are supported for file descriptors"
252
253     self._io_wait_remove.append(fd)
254
255   def RegisterSignal(self, owner):
256     """Registers a receiver for signal notifications
257
258     The receiver must support a "OnSignal(self, signum)" function.
259
260     @type owner: instance
261     @param owner: Receiver
262
263     """
264     self._signal_wait.append(owner)
265
266   def AddTimer(self, owner, interval, repeat):
267     """Add a new timer.
268
269     The receiver must support a "OnTimer(self, timer_id)" function.
270
271     @type owner: instance
272     @param owner: Receiver
273     @type interval: int or float
274     @param interval: Timer interval in seconds
275     @type repeat: bool
276     @param repeat: Whether this is a repeating timer or one-off
277
278     """
279     # TODO: Implement repeating timers
280     assert not repeat, "Repeating timers are not yet supported"
281
282     # Get new ID
283     self._timer_id_last += 1
284
285     timer_id = self._timer_id_last
286
287     self._timer_add.append(Timer(owner, timer_id, time.time(),
288                                  float(interval), repeat))
289
290     return timer_id
291
292   def RemoveTimer(self, timer_id):
293     """Removes a timer.
294
295     @type timer_id: int
296     @param timer_id: Timer ID
297
298     """
299     self._timer_remove.append(timer_id)