verify-disks: Explicitely state nothing has to be done
[ganeti-local] / test / ganeti.utils.log_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for testing ganeti.utils.log"""
23
24 import os
25 import unittest
26 import logging
27 import tempfile
28 import shutil
29
30 from ganeti import constants
31 from ganeti import errors
32 from ganeti import utils
33
34 import testutils
35
36
37 class TestLogHandler(unittest.TestCase):
38   def testNormal(self):
39     tmpfile = tempfile.NamedTemporaryFile()
40
41     handler = utils.log._ReopenableLogHandler(tmpfile.name)
42     handler.setFormatter(logging.Formatter("%(asctime)s: %(message)s"))
43
44     logger = logging.Logger("TestLogger")
45     logger.addHandler(handler)
46     self.assertEqual(len(logger.handlers), 1)
47
48     logger.error("Test message ERROR")
49     logger.info("Test message INFO")
50
51     logger.removeHandler(handler)
52     self.assertFalse(logger.handlers)
53     handler.close()
54
55     self.assertEqual(len(utils.ReadFile(tmpfile.name).splitlines()), 2)
56
57   def testReopen(self):
58     tmpfile = tempfile.NamedTemporaryFile()
59     tmpfile2 = tempfile.NamedTemporaryFile()
60
61     handler = utils.log._ReopenableLogHandler(tmpfile.name)
62
63     self.assertFalse(utils.ReadFile(tmpfile.name))
64     self.assertFalse(utils.ReadFile(tmpfile2.name))
65
66     logger = logging.Logger("TestLoggerReopen")
67     logger.addHandler(handler)
68
69     for _ in range(3):
70       logger.error("Test message ERROR")
71     handler.flush()
72     self.assertEqual(len(utils.ReadFile(tmpfile.name).splitlines()), 3)
73     before_id = utils.GetFileID(tmpfile.name)
74
75     handler.RequestReopen()
76     self.assertTrue(handler._reopen)
77     self.assertTrue(utils.VerifyFileID(utils.GetFileID(tmpfile.name),
78                                        before_id))
79
80     # Rename only after requesting reopen
81     os.rename(tmpfile.name, tmpfile2.name)
82     assert not os.path.exists(tmpfile.name)
83
84     # Write another message, should reopen
85     for _ in range(4):
86       logger.info("Test message INFO")
87
88       # Flag must be reset
89       self.assertFalse(handler._reopen)
90
91       self.assertFalse(utils.VerifyFileID(utils.GetFileID(tmpfile.name),
92                                           before_id))
93
94     logger.removeHandler(handler)
95     self.assertFalse(logger.handlers)
96     handler.close()
97
98     self.assertEqual(len(utils.ReadFile(tmpfile.name).splitlines()), 4)
99     self.assertEqual(len(utils.ReadFile(tmpfile2.name).splitlines()), 3)
100
101   def testConsole(self):
102     for (console, check) in [(None, False),
103                              (tempfile.NamedTemporaryFile(), True),
104                              (self._FailingFile(os.devnull), False)]:
105       # Create a handler which will fail when handling errors
106       cls = utils.log._LogErrorsToConsole(self._FailingHandler)
107
108       # Instantiate handler with file which will fail when writing,
109       # provoking a write to the console
110       handler = cls(console, self._FailingFile(os.devnull))
111
112       logger = logging.Logger("TestLogger")
113       logger.addHandler(handler)
114       self.assertEqual(len(logger.handlers), 1)
115
116       # Provoke write
117       logger.error("Test message ERROR")
118
119       # Take everything apart
120       logger.removeHandler(handler)
121       self.assertFalse(logger.handlers)
122       handler.close()
123
124       if console and check:
125         console.flush()
126
127         # Check console output
128         consout = utils.ReadFile(console.name)
129         self.assertTrue("Cannot log message" in consout)
130         self.assertTrue("Test message ERROR" in consout)
131
132   class _FailingFile(file):
133     def write(self, _):
134       raise Exception
135
136   class _FailingHandler(logging.StreamHandler):
137     def handleError(self, _):
138       raise Exception
139
140
141 class TestSetupLogging(unittest.TestCase):
142   def setUp(self):
143     self.tmpdir = tempfile.mkdtemp()
144
145   def tearDown(self):
146     shutil.rmtree(self.tmpdir)
147
148   def testSimple(self):
149     logfile = utils.PathJoin(self.tmpdir, "basic.log")
150     logger = logging.Logger("TestLogger")
151     self.assertTrue(callable(utils.SetupLogging(logfile, "test",
152                                                 console_logging=False,
153                                                 syslog=constants.SYSLOG_NO,
154                                                 stderr_logging=False,
155                                                 multithreaded=False,
156                                                 root_logger=logger)))
157     self.assertEqual(utils.ReadFile(logfile), "")
158     logger.error("This is a test")
159
160     # Ensure SetupLogging used custom logger
161     logging.error("This message should not show up in the test log file")
162
163     self.assertTrue(utils.ReadFile(logfile).endswith("This is a test\n"))
164
165   def testReopen(self):
166     logfile = utils.PathJoin(self.tmpdir, "reopen.log")
167     logfile2 = utils.PathJoin(self.tmpdir, "reopen.log.OLD")
168     logger = logging.Logger("TestLogger")
169     reopen_fn = utils.SetupLogging(logfile, "test",
170                                    console_logging=False,
171                                    syslog=constants.SYSLOG_NO,
172                                    stderr_logging=False,
173                                    multithreaded=False,
174                                    root_logger=logger)
175     self.assertTrue(callable(reopen_fn))
176
177     self.assertEqual(utils.ReadFile(logfile), "")
178     logger.error("This is a test")
179     self.assertTrue(utils.ReadFile(logfile).endswith("This is a test\n"))
180
181     os.rename(logfile, logfile2)
182     assert not os.path.exists(logfile)
183
184     # Notify logger to reopen on the next message
185     reopen_fn()
186     assert not os.path.exists(logfile)
187
188     # Provoke actual reopen
189     logger.error("First message")
190
191     self.assertTrue(utils.ReadFile(logfile).endswith("First message\n"))
192     self.assertTrue(utils.ReadFile(logfile2).endswith("This is a test\n"))
193
194
195 if __name__ == "__main__":
196   testutils.GanetiTestProgram()