verify-disks: Explicitely state nothing has to be done
[ganeti-local] / test / ganeti.daemon_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for unittesting the daemon module"""
23
24 import unittest
25 import signal
26 import os
27 import socket
28 import time
29 import tempfile
30 import shutil
31
32 from ganeti import daemon
33 from ganeti import errors
34 from ganeti import constants
35 from ganeti import utils
36
37 import testutils
38
39
40 class TestMainloop(testutils.GanetiTestCase):
41   """Test daemon.Mainloop"""
42
43   def setUp(self):
44     testutils.GanetiTestCase.setUp(self)
45     self.mainloop = daemon.Mainloop()
46     self.sendsig_events = []
47     self.onsignal_events = []
48
49   def _CancelEvent(self, handle):
50     self.mainloop.scheduler.cancel(handle)
51
52   def _SendSig(self, sig):
53     self.sendsig_events.append(sig)
54     os.kill(os.getpid(), sig)
55
56   def OnSignal(self, signum):
57     self.onsignal_events.append(signum)
58
59   def testRunAndTermBySched(self):
60     self.mainloop.scheduler.enter(0.1, 1, self._SendSig, [signal.SIGTERM])
61     self.mainloop.Run() # terminates by _SendSig being scheduled
62     self.assertEquals(self.sendsig_events, [signal.SIGTERM])
63
64   def testTerminatingSignals(self):
65     self.mainloop.scheduler.enter(0.1, 1, self._SendSig, [signal.SIGCHLD])
66     self.mainloop.scheduler.enter(0.2, 1, self._SendSig, [signal.SIGINT])
67     self.mainloop.Run()
68     self.assertEquals(self.sendsig_events, [signal.SIGCHLD, signal.SIGINT])
69     self.mainloop.scheduler.enter(0.1, 1, self._SendSig, [signal.SIGTERM])
70     self.mainloop.Run()
71     self.assertEquals(self.sendsig_events, [signal.SIGCHLD, signal.SIGINT,
72                                             signal.SIGTERM])
73
74   def testSchedulerCancel(self):
75     handle = self.mainloop.scheduler.enter(0.1, 1, self._SendSig,
76                                            [signal.SIGTERM])
77     self.mainloop.scheduler.cancel(handle)
78     self.mainloop.scheduler.enter(0.2, 1, self._SendSig, [signal.SIGCHLD])
79     self.mainloop.scheduler.enter(0.3, 1, self._SendSig, [signal.SIGTERM])
80     self.mainloop.Run()
81     self.assertEquals(self.sendsig_events, [signal.SIGCHLD, signal.SIGTERM])
82
83   def testRegisterSignal(self):
84     self.mainloop.RegisterSignal(self)
85     self.mainloop.scheduler.enter(0.1, 1, self._SendSig, [signal.SIGCHLD])
86     handle = self.mainloop.scheduler.enter(0.1, 1, self._SendSig,
87                                            [signal.SIGTERM])
88     self.mainloop.scheduler.cancel(handle)
89     self.mainloop.scheduler.enter(0.2, 1, self._SendSig, [signal.SIGCHLD])
90     self.mainloop.scheduler.enter(0.3, 1, self._SendSig, [signal.SIGTERM])
91     # ...not delievered because they are scheduled after TERM
92     self.mainloop.scheduler.enter(0.4, 1, self._SendSig, [signal.SIGCHLD])
93     self.mainloop.scheduler.enter(0.5, 1, self._SendSig, [signal.SIGCHLD])
94     self.mainloop.Run()
95     self.assertEquals(self.sendsig_events,
96                       [signal.SIGCHLD, signal.SIGCHLD, signal.SIGTERM])
97     self.assertEquals(self.onsignal_events, self.sendsig_events)
98
99   def testDeferredCancel(self):
100     self.mainloop.RegisterSignal(self)
101     now = time.time()
102     self.mainloop.scheduler.enterabs(now + 0.1, 1, self._SendSig,
103                                      [signal.SIGCHLD])
104     handle1 = self.mainloop.scheduler.enterabs(now + 0.3, 2, self._SendSig,
105                                                [signal.SIGCHLD])
106     handle2 = self.mainloop.scheduler.enterabs(now + 0.4, 2, self._SendSig,
107                                                [signal.SIGCHLD])
108     self.mainloop.scheduler.enterabs(now + 0.2, 1, self._CancelEvent,
109                                      [handle1])
110     self.mainloop.scheduler.enterabs(now + 0.2, 1, self._CancelEvent,
111                                      [handle2])
112     self.mainloop.scheduler.enter(0.5, 1, self._SendSig, [signal.SIGTERM])
113     self.mainloop.Run()
114     self.assertEquals(self.sendsig_events, [signal.SIGCHLD, signal.SIGTERM])
115     self.assertEquals(self.onsignal_events, self.sendsig_events)
116
117   def testReRun(self):
118     self.mainloop.RegisterSignal(self)
119     self.mainloop.scheduler.enter(0.1, 1, self._SendSig, [signal.SIGCHLD])
120     self.mainloop.scheduler.enter(0.2, 1, self._SendSig, [signal.SIGCHLD])
121     self.mainloop.scheduler.enter(0.3, 1, self._SendSig, [signal.SIGTERM])
122     self.mainloop.scheduler.enter(0.4, 1, self._SendSig, [signal.SIGCHLD])
123     self.mainloop.scheduler.enter(0.5, 1, self._SendSig, [signal.SIGCHLD])
124     self.mainloop.Run()
125     self.assertEquals(self.sendsig_events,
126                       [signal.SIGCHLD, signal.SIGCHLD, signal.SIGTERM])
127     self.assertEquals(self.onsignal_events, self.sendsig_events)
128     self.mainloop.scheduler.enter(0.3, 1, self._SendSig, [signal.SIGTERM])
129     self.mainloop.Run()
130     self.assertEquals(self.sendsig_events,
131                       [signal.SIGCHLD, signal.SIGCHLD, signal.SIGTERM,
132                        signal.SIGCHLD, signal.SIGCHLD, signal.SIGTERM])
133     self.assertEquals(self.onsignal_events, self.sendsig_events)
134
135   def testPriority(self):
136     # for events at the same time, the highest priority one executes first
137     now = time.time()
138     self.mainloop.scheduler.enterabs(now + 0.1, 2, self._SendSig,
139                                      [signal.SIGCHLD])
140     self.mainloop.scheduler.enterabs(now + 0.1, 1, self._SendSig,
141                                      [signal.SIGTERM])
142     self.mainloop.Run()
143     self.assertEquals(self.sendsig_events, [signal.SIGTERM])
144     self.mainloop.scheduler.enter(0.2, 1, self._SendSig, [signal.SIGTERM])
145     self.mainloop.Run()
146     self.assertEquals(self.sendsig_events,
147                       [signal.SIGTERM, signal.SIGCHLD, signal.SIGTERM])
148
149
150 class _MyAsyncUDPSocket(daemon.AsyncUDPSocket):
151
152   def __init__(self, family):
153     daemon.AsyncUDPSocket.__init__(self, family)
154     self.received = []
155     self.error_count = 0
156
157   def handle_datagram(self, payload, ip, port):
158     self.received.append((payload))
159     if payload == "terminate":
160       os.kill(os.getpid(), signal.SIGTERM)
161     elif payload == "error":
162       raise errors.GenericError("error")
163
164   def handle_error(self):
165     self.error_count += 1
166     raise
167
168
169 class _BaseAsyncUDPSocketTest:
170   """Base class for  AsyncUDPSocket tests"""
171
172   family = None
173   address = None
174
175   def setUp(self):
176     self.mainloop = daemon.Mainloop()
177     self.server = _MyAsyncUDPSocket(self.family)
178     self.client = _MyAsyncUDPSocket(self.family)
179     self.server.bind((self.address, 0))
180     self.port = self.server.getsockname()[1]
181     # Save utils.IgnoreSignals so we can do evil things to it...
182     self.saved_utils_ignoresignals = utils.IgnoreSignals
183
184   def tearDown(self):
185     self.server.close()
186     self.client.close()
187     # ...and restore it as well
188     utils.IgnoreSignals = self.saved_utils_ignoresignals
189     testutils.GanetiTestCase.tearDown(self)
190
191   def testNoDoubleBind(self):
192     self.assertRaises(socket.error, self.client.bind, (self.address, self.port))
193
194   def testAsyncClientServer(self):
195     self.client.enqueue_send(self.address, self.port, "p1")
196     self.client.enqueue_send(self.address, self.port, "p2")
197     self.client.enqueue_send(self.address, self.port, "terminate")
198     self.mainloop.Run()
199     self.assertEquals(self.server.received, ["p1", "p2", "terminate"])
200
201   def testSyncClientServer(self):
202     self.client.handle_write()
203     self.client.enqueue_send(self.address, self.port, "p1")
204     self.client.enqueue_send(self.address, self.port, "p2")
205     while self.client.writable():
206       self.client.handle_write()
207     self.server.process_next_packet()
208     self.assertEquals(self.server.received, ["p1"])
209     self.server.process_next_packet()
210     self.assertEquals(self.server.received, ["p1", "p2"])
211     self.client.enqueue_send(self.address, self.port, "p3")
212     while self.client.writable():
213       self.client.handle_write()
214     self.server.process_next_packet()
215     self.assertEquals(self.server.received, ["p1", "p2", "p3"])
216
217   def testErrorHandling(self):
218     self.client.enqueue_send(self.address, self.port, "p1")
219     self.client.enqueue_send(self.address, self.port, "p2")
220     self.client.enqueue_send(self.address, self.port, "error")
221     self.client.enqueue_send(self.address, self.port, "p3")
222     self.client.enqueue_send(self.address, self.port, "error")
223     self.client.enqueue_send(self.address, self.port, "terminate")
224     self.assertRaises(errors.GenericError, self.mainloop.Run)
225     self.assertEquals(self.server.received,
226                       ["p1", "p2", "error"])
227     self.assertEquals(self.server.error_count, 1)
228     self.assertRaises(errors.GenericError, self.mainloop.Run)
229     self.assertEquals(self.server.received,
230                       ["p1", "p2", "error", "p3", "error"])
231     self.assertEquals(self.server.error_count, 2)
232     self.mainloop.Run()
233     self.assertEquals(self.server.received,
234                       ["p1", "p2", "error", "p3", "error", "terminate"])
235     self.assertEquals(self.server.error_count, 2)
236
237   def testSignaledWhileReceiving(self):
238     utils.IgnoreSignals = lambda fn, *args, **kwargs: None
239     self.client.enqueue_send(self.address, self.port, "p1")
240     self.client.enqueue_send(self.address, self.port, "p2")
241     self.server.handle_read()
242     self.assertEquals(self.server.received, [])
243     self.client.enqueue_send(self.address, self.port, "terminate")
244     utils.IgnoreSignals = self.saved_utils_ignoresignals
245     self.mainloop.Run()
246     self.assertEquals(self.server.received, ["p1", "p2", "terminate"])
247
248   def testOversizedDatagram(self):
249     oversized_data = (constants.MAX_UDP_DATA_SIZE + 1) * "a"
250     self.assertRaises(errors.UdpDataSizeError, self.client.enqueue_send,
251                       self.address, self.port, oversized_data)
252
253
254 class TestAsyncIP4UDPSocket(testutils.GanetiTestCase, _BaseAsyncUDPSocketTest):
255   """Test IP4 daemon.AsyncUDPSocket"""
256
257   family = socket.AF_INET
258   address = "127.0.0.1"
259
260   def setUp(self):
261     testutils.GanetiTestCase.setUp(self)
262     _BaseAsyncUDPSocketTest.setUp(self)
263
264   def tearDown(self):
265     testutils.GanetiTestCase.tearDown(self)
266     _BaseAsyncUDPSocketTest.tearDown(self)
267
268
269 class TestAsyncIP6UDPSocket(testutils.GanetiTestCase, _BaseAsyncUDPSocketTest):
270   """Test IP6 daemon.AsyncUDPSocket"""
271
272   family = socket.AF_INET6
273   address = "::1"
274
275   def setUp(self):
276     testutils.GanetiTestCase.setUp(self)
277     _BaseAsyncUDPSocketTest.setUp(self)
278
279   def tearDown(self):
280     testutils.GanetiTestCase.tearDown(self)
281     _BaseAsyncUDPSocketTest.tearDown(self)
282
283
284 class _MyAsyncStreamServer(daemon.AsyncStreamServer):
285
286   def __init__(self, family, address, handle_connection_fn):
287     daemon.AsyncStreamServer.__init__(self, family, address)
288     self.handle_connection_fn = handle_connection_fn
289     self.error_count = 0
290     self.expt_count = 0
291
292   def handle_connection(self, connected_socket, client_address):
293     self.handle_connection_fn(connected_socket, client_address)
294
295   def handle_error(self):
296     self.error_count += 1
297     self.close()
298     raise
299
300   def handle_expt(self):
301     self.expt_count += 1
302     self.close()
303
304
305 class _MyMessageStreamHandler(daemon.AsyncTerminatedMessageStream):
306
307   def __init__(self, connected_socket, client_address, terminator, family,
308                message_fn, client_id, unhandled_limit):
309     daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket,
310                                                  client_address,
311                                                  terminator, family,
312                                                  unhandled_limit)
313     self.message_fn = message_fn
314     self.client_id = client_id
315     self.error_count = 0
316
317   def handle_message(self, message, message_id):
318     self.message_fn(self, message, message_id)
319
320   def handle_error(self):
321     self.error_count += 1
322     raise
323
324
325 class TestAsyncStreamServerTCP(testutils.GanetiTestCase):
326   """Test daemon.AsyncStreamServer with a TCP connection"""
327
328   family = socket.AF_INET
329
330   def setUp(self):
331     testutils.GanetiTestCase.setUp(self)
332     self.mainloop = daemon.Mainloop()
333     self.address = self.getAddress()
334     self.server = _MyAsyncStreamServer(self.family, self.address,
335                                        self.handle_connection)
336     self.client_handler = _MyMessageStreamHandler
337     self.unhandled_limit = None
338     self.terminator = "\3"
339     self.address = self.server.getsockname()
340     self.clients = []
341     self.connections = []
342     self.messages = {}
343     self.connect_terminate_count = 0
344     self.message_terminate_count = 0
345     self.next_client_id = 0
346     # Save utils.IgnoreSignals so we can do evil things to it...
347     self.saved_utils_ignoresignals = utils.IgnoreSignals
348
349   def tearDown(self):
350     for c in self.clients:
351       c.close()
352     for c in self.connections:
353       c.close()
354     self.server.close()
355     # ...and restore it as well
356     utils.IgnoreSignals = self.saved_utils_ignoresignals
357     testutils.GanetiTestCase.tearDown(self)
358
359   def getAddress(self):
360     return ("127.0.0.1", 0)
361
362   def countTerminate(self, name):
363     value = getattr(self, name)
364     if value is not None:
365       value -= 1
366       setattr(self, name, value)
367       if value <= 0:
368         os.kill(os.getpid(), signal.SIGTERM)
369
370   def handle_connection(self, connected_socket, client_address):
371     client_id = self.next_client_id
372     self.next_client_id += 1
373     client_handler = self.client_handler(connected_socket, client_address,
374                                          self.terminator, self.family,
375                                          self.handle_message,
376                                          client_id, self.unhandled_limit)
377     self.connections.append(client_handler)
378     self.countTerminate("connect_terminate_count")
379
380   def handle_message(self, handler, message, message_id):
381     self.messages.setdefault(handler.client_id, [])
382     # We should just check that the message_ids are monotonically increasing.
383     # If in the unit tests we never remove messages from the received queue,
384     # though, we can just require that the queue length is the same as the
385     # message id, before pushing the message to it. This forces a more
386     # restrictive check, but we can live with this for now.
387     self.assertEquals(len(self.messages[handler.client_id]), message_id)
388     self.messages[handler.client_id].append(message)
389     if message == "error":
390       raise errors.GenericError("error")
391     self.countTerminate("message_terminate_count")
392
393   def getClient(self):
394     client = socket.socket(self.family, socket.SOCK_STREAM)
395     client.connect(self.address)
396     self.clients.append(client)
397     return client
398
399   def tearDown(self):
400     testutils.GanetiTestCase.tearDown(self)
401     self.server.close()
402
403   def testConnect(self):
404     self.getClient()
405     self.mainloop.Run()
406     self.assertEquals(len(self.connections), 1)
407     self.getClient()
408     self.mainloop.Run()
409     self.assertEquals(len(self.connections), 2)
410     self.connect_terminate_count = 4
411     self.getClient()
412     self.getClient()
413     self.getClient()
414     self.getClient()
415     self.mainloop.Run()
416     self.assertEquals(len(self.connections), 6)
417
418   def testBasicMessage(self):
419     self.connect_terminate_count = None
420     client = self.getClient()
421     client.send("ciao\3")
422     self.mainloop.Run()
423     self.assertEquals(len(self.connections), 1)
424     self.assertEquals(len(self.messages[0]), 1)
425     self.assertEquals(self.messages[0][0], "ciao")
426
427   def testDoubleMessage(self):
428     self.connect_terminate_count = None
429     client = self.getClient()
430     client.send("ciao\3")
431     self.mainloop.Run()
432     client.send("foobar\3")
433     self.mainloop.Run()
434     self.assertEquals(len(self.connections), 1)
435     self.assertEquals(len(self.messages[0]), 2)
436     self.assertEquals(self.messages[0][1], "foobar")
437
438   def testComposedMessage(self):
439     self.connect_terminate_count = None
440     self.message_terminate_count = 3
441     client = self.getClient()
442     client.send("one\3composed\3message\3")
443     self.mainloop.Run()
444     self.assertEquals(len(self.messages[0]), 3)
445     self.assertEquals(self.messages[0], ["one", "composed", "message"])
446
447   def testLongTerminator(self):
448     self.terminator = "\0\1\2"
449     self.connect_terminate_count = None
450     self.message_terminate_count = 3
451     client = self.getClient()
452     client.send("one\0\1\2composed\0\1\2message\0\1\2")
453     self.mainloop.Run()
454     self.assertEquals(len(self.messages[0]), 3)
455     self.assertEquals(self.messages[0], ["one", "composed", "message"])
456
457   def testErrorHandling(self):
458     self.connect_terminate_count = None
459     self.message_terminate_count = None
460     client = self.getClient()
461     client.send("one\3two\3error\3three\3")
462     self.assertRaises(errors.GenericError, self.mainloop.Run)
463     self.assertEquals(self.connections[0].error_count, 1)
464     self.assertEquals(self.messages[0], ["one", "two", "error"])
465     client.send("error\3")
466     self.assertRaises(errors.GenericError, self.mainloop.Run)
467     self.assertEquals(self.connections[0].error_count, 2)
468     self.assertEquals(self.messages[0], ["one", "two", "error", "three",
469                                          "error"])
470
471   def testDoubleClient(self):
472     self.connect_terminate_count = None
473     self.message_terminate_count = 2
474     client1 = self.getClient()
475     client2 = self.getClient()
476     client1.send("c1m1\3")
477     client2.send("c2m1\3")
478     self.mainloop.Run()
479     self.assertEquals(self.messages[0], ["c1m1"])
480     self.assertEquals(self.messages[1], ["c2m1"])
481
482   def testUnterminatedMessage(self):
483     self.connect_terminate_count = None
484     self.message_terminate_count = 3
485     client1 = self.getClient()
486     client2 = self.getClient()
487     client1.send("message\3unterminated")
488     client2.send("c2m1\3c2m2\3")
489     self.mainloop.Run()
490     self.assertEquals(self.messages[0], ["message"])
491     self.assertEquals(self.messages[1], ["c2m1", "c2m2"])
492     client1.send("message\3")
493     self.mainloop.Run()
494     self.assertEquals(self.messages[0], ["message", "unterminatedmessage"])
495
496   def testSignaledWhileAccepting(self):
497     utils.IgnoreSignals = lambda fn, *args, **kwargs: None
498     client1 = self.getClient()
499     self.server.handle_accept()
500     # When interrupted while accepting we don't have a connection, but we
501     # didn't crash either.
502     self.assertEquals(len(self.connections), 0)
503     utils.IgnoreSignals = self.saved_utils_ignoresignals
504     self.mainloop.Run()
505     self.assertEquals(len(self.connections), 1)
506
507   def testSendMessage(self):
508     self.connect_terminate_count = None
509     self.message_terminate_count = 3
510     client1 = self.getClient()
511     client2 = self.getClient()
512     client1.send("one\3composed\3message\3")
513     self.mainloop.Run()
514     self.assertEquals(self.messages[0], ["one", "composed", "message"])
515     self.assertFalse(self.connections[0].writable())
516     self.assertFalse(self.connections[1].writable())
517     self.connections[0].send_message("r0")
518     self.assert_(self.connections[0].writable())
519     self.assertFalse(self.connections[1].writable())
520     self.connections[0].send_message("r1")
521     self.connections[0].send_message("r2")
522     # We currently have no way to terminate the mainloop on write events, but
523     # let's assume handle_write will be called if writable() is True.
524     while self.connections[0].writable():
525       self.connections[0].handle_write()
526     client1.setblocking(0)
527     client2.setblocking(0)
528     self.assertEquals(client1.recv(4096), "r0\3r1\3r2\3")
529     self.assertRaises(socket.error, client2.recv, 4096)
530
531   def testLimitedUnhandledMessages(self):
532     self.connect_terminate_count = None
533     self.message_terminate_count = 3
534     self.unhandled_limit = 2
535     client1 = self.getClient()
536     client2 = self.getClient()
537     client1.send("one\3composed\3long\3message\3")
538     client2.send("c2one\3")
539     self.mainloop.Run()
540     self.assertEquals(self.messages[0], ["one", "composed"])
541     self.assertEquals(self.messages[1], ["c2one"])
542     self.assertFalse(self.connections[0].readable())
543     self.assert_(self.connections[1].readable())
544     self.connections[0].send_message("r0")
545     self.message_terminate_count = None
546     client1.send("another\3")
547     # when we write replies messages queued also get handled, but not the ones
548     # in the socket.
549     while self.connections[0].writable():
550       self.connections[0].handle_write()
551     self.assertFalse(self.connections[0].readable())
552     self.assertEquals(self.messages[0], ["one", "composed", "long"])
553     self.connections[0].send_message("r1")
554     self.connections[0].send_message("r2")
555     while self.connections[0].writable():
556       self.connections[0].handle_write()
557     self.assertEquals(self.messages[0], ["one", "composed", "long", "message"])
558     self.assert_(self.connections[0].readable())
559
560   def testLimitedUnhandledMessagesOne(self):
561     self.connect_terminate_count = None
562     self.message_terminate_count = 2
563     self.unhandled_limit = 1
564     client1 = self.getClient()
565     client2 = self.getClient()
566     client1.send("one\3composed\3message\3")
567     client2.send("c2one\3")
568     self.mainloop.Run()
569     self.assertEquals(self.messages[0], ["one"])
570     self.assertEquals(self.messages[1], ["c2one"])
571     self.assertFalse(self.connections[0].readable())
572     self.assertFalse(self.connections[1].readable())
573     self.connections[0].send_message("r0")
574     self.message_terminate_count = None
575     while self.connections[0].writable():
576       self.connections[0].handle_write()
577     self.assertFalse(self.connections[0].readable())
578     self.assertEquals(self.messages[0], ["one", "composed"])
579     self.connections[0].send_message("r2")
580     self.connections[0].send_message("r3")
581     while self.connections[0].writable():
582       self.connections[0].handle_write()
583     self.assertEquals(self.messages[0], ["one", "composed", "message"])
584     self.assert_(self.connections[0].readable())
585
586
587 class TestAsyncStreamServerUnixPath(TestAsyncStreamServerTCP):
588   """Test daemon.AsyncStreamServer with a Unix path connection"""
589
590   family = socket.AF_UNIX
591
592   def getAddress(self):
593     self.tmpdir = tempfile.mkdtemp()
594     return os.path.join(self.tmpdir, "server.sock")
595
596   def tearDown(self):
597     shutil.rmtree(self.tmpdir)
598     TestAsyncStreamServerTCP.tearDown(self)
599
600
601 class TestAsyncAwaker(testutils.GanetiTestCase):
602   """Test daemon.AsyncAwaker"""
603
604   family = socket.AF_INET
605
606   def setUp(self):
607     testutils.GanetiTestCase.setUp(self)
608     self.mainloop = daemon.Mainloop()
609     self.awaker = daemon.AsyncAwaker(signal_fn=self.handle_signal)
610     self.signal_count = 0
611     self.signal_terminate_count = 1
612
613   def tearDown(self):
614     self.awaker.close()
615
616   def handle_signal(self):
617     self.signal_count += 1
618     self.signal_terminate_count -= 1
619     if self.signal_terminate_count <= 0:
620       os.kill(os.getpid(), signal.SIGTERM)
621
622   def testBasicSignaling(self):
623     self.awaker.signal()
624     self.mainloop.Run()
625     self.assertEquals(self.signal_count, 1)
626
627   def testDoubleSignaling(self):
628     self.awaker.signal()
629     self.awaker.signal()
630     self.mainloop.Run()
631     # The second signal is never delivered
632     self.assertEquals(self.signal_count, 1)
633
634   def testReallyDoubleSignaling(self):
635     self.assert_(self.awaker.readable())
636     self.awaker.signal()
637     # Let's suppose two threads overlap, and both find need_signal True
638     self.awaker.need_signal = True
639     self.awaker.signal()
640     self.mainloop.Run()
641     # We still get only one signaling
642     self.assertEquals(self.signal_count, 1)
643
644   def testNoSignalFnArgument(self):
645     myawaker = daemon.AsyncAwaker()
646     self.assertRaises(socket.error, myawaker.handle_read)
647     myawaker.signal()
648     myawaker.handle_read()
649     self.assertRaises(socket.error, myawaker.handle_read)
650     myawaker.signal()
651     myawaker.signal()
652     myawaker.handle_read()
653     self.assertRaises(socket.error, myawaker.handle_read)
654     myawaker.close()
655
656   def testWrongSignalFnArgument(self):
657     self.assertRaises(AssertionError, daemon.AsyncAwaker, 1)
658     self.assertRaises(AssertionError, daemon.AsyncAwaker, "string")
659     self.assertRaises(AssertionError, daemon.AsyncAwaker, signal_fn=1)
660     self.assertRaises(AssertionError, daemon.AsyncAwaker, signal_fn="string")
661
662
663 if __name__ == "__main__":
664   testutils.GanetiTestProgram()