4 # Copyright (C) 2011 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for testing ganeti.utils.log"""
30 from cStringIO import StringIO
32 from ganeti import constants
33 from ganeti import errors
34 from ganeti import compat
35 from ganeti import utils
40 class TestLogHandler(unittest.TestCase):
42 tmpfile = tempfile.NamedTemporaryFile()
44 handler = utils.log._ReopenableLogHandler(tmpfile.name)
45 handler.setFormatter(logging.Formatter("%(asctime)s: %(message)s"))
47 logger = logging.Logger("TestLogger")
48 logger.addHandler(handler)
49 self.assertEqual(len(logger.handlers), 1)
51 logger.error("Test message ERROR")
52 logger.info("Test message INFO")
54 logger.removeHandler(handler)
55 self.assertFalse(logger.handlers)
58 self.assertEqual(len(utils.ReadFile(tmpfile.name).splitlines()), 2)
61 tmpfile = tempfile.NamedTemporaryFile()
62 tmpfile2 = tempfile.NamedTemporaryFile()
64 handler = utils.log._ReopenableLogHandler(tmpfile.name)
66 self.assertFalse(utils.ReadFile(tmpfile.name))
67 self.assertFalse(utils.ReadFile(tmpfile2.name))
69 logger = logging.Logger("TestLoggerReopen")
70 logger.addHandler(handler)
73 logger.error("Test message ERROR")
75 self.assertEqual(len(utils.ReadFile(tmpfile.name).splitlines()), 3)
76 before_id = utils.GetFileID(tmpfile.name)
78 handler.RequestReopen()
79 self.assertTrue(handler._reopen)
80 self.assertTrue(utils.VerifyFileID(utils.GetFileID(tmpfile.name),
83 # Rename only after requesting reopen
84 os.rename(tmpfile.name, tmpfile2.name)
85 assert not os.path.exists(tmpfile.name)
87 # Write another message, should reopen
89 logger.info("Test message INFO")
92 self.assertFalse(handler._reopen)
94 self.assertFalse(utils.VerifyFileID(utils.GetFileID(tmpfile.name),
97 logger.removeHandler(handler)
98 self.assertFalse(logger.handlers)
101 self.assertEqual(len(utils.ReadFile(tmpfile.name).splitlines()), 4)
102 self.assertEqual(len(utils.ReadFile(tmpfile2.name).splitlines()), 3)
104 def testConsole(self):
105 for (console, check) in [(None, False),
106 (tempfile.NamedTemporaryFile(), True),
107 (self._FailingFile(os.devnull), False)]:
108 # Create a handler which will fail when handling errors
109 cls = utils.log._LogErrorsToConsole(self._FailingHandler)
111 # Instantiate handler with file which will fail when writing,
112 # provoking a write to the console
113 handler = cls(console, self._FailingFile(os.devnull))
115 logger = logging.Logger("TestLogger")
116 logger.addHandler(handler)
117 self.assertEqual(len(logger.handlers), 1)
120 logger.error("Test message ERROR")
122 # Take everything apart
123 logger.removeHandler(handler)
124 self.assertFalse(logger.handlers)
127 if console and check:
130 # Check console output
131 consout = utils.ReadFile(console.name)
132 self.assertTrue("Cannot log message" in consout)
133 self.assertTrue("Test message ERROR" in consout)
135 class _FailingFile(file):
139 class _FailingHandler(logging.StreamHandler):
140 def handleError(self, _):
144 class TestSetupLogging(unittest.TestCase):
146 self.tmpdir = tempfile.mkdtemp()
149 shutil.rmtree(self.tmpdir)
151 def testSimple(self):
152 logfile = utils.PathJoin(self.tmpdir, "basic.log")
153 logger = logging.Logger("TestLogger")
154 self.assertTrue(callable(utils.SetupLogging(logfile, "test",
155 console_logging=False,
156 syslog=constants.SYSLOG_NO,
157 stderr_logging=False,
159 root_logger=logger)))
160 self.assertEqual(utils.ReadFile(logfile), "")
161 logger.error("This is a test")
163 # Ensure SetupLogging used custom logger
164 logging.error("This message should not show up in the test log file")
166 self.assertTrue(utils.ReadFile(logfile).endswith("This is a test\n"))
168 def testReopen(self):
169 logfile = utils.PathJoin(self.tmpdir, "reopen.log")
170 logfile2 = utils.PathJoin(self.tmpdir, "reopen.log.OLD")
171 logger = logging.Logger("TestLogger")
172 reopen_fn = utils.SetupLogging(logfile, "test",
173 console_logging=False,
174 syslog=constants.SYSLOG_NO,
175 stderr_logging=False,
178 self.assertTrue(callable(reopen_fn))
180 self.assertEqual(utils.ReadFile(logfile), "")
181 logger.error("This is a test")
182 self.assertTrue(utils.ReadFile(logfile).endswith("This is a test\n"))
184 os.rename(logfile, logfile2)
185 assert not os.path.exists(logfile)
187 # Notify logger to reopen on the next message
189 assert not os.path.exists(logfile)
191 # Provoke actual reopen
192 logger.error("First message")
194 self.assertTrue(utils.ReadFile(logfile).endswith("First message\n"))
195 self.assertTrue(utils.ReadFile(logfile2).endswith("This is a test\n"))
198 class TestSetupToolLogging(unittest.TestCase):
200 error_name = logging.getLevelName(logging.ERROR)
201 warn_name = logging.getLevelName(logging.WARNING)
202 info_name = logging.getLevelName(logging.INFO)
203 debug_name = logging.getLevelName(logging.DEBUG)
205 for debug in [False, True]:
206 for verbose in [False, True]:
207 logger = logging.Logger("TestLogger")
210 utils.SetupToolLogging(debug, verbose, _root_logger=logger, _stream=buf)
212 logger.error("level=error")
213 logger.warning("level=warning")
214 logger.info("level=info")
215 logger.debug("level=debug")
217 lines = buf.getvalue().splitlines()
219 self.assertTrue(compat.all(line.count(":") == 3 for line in lines))
221 messages = [line.split(":", 3)[-1].strip() for line in lines]
224 self.assertEqual(messages, [
225 "%s level=error" % error_name,
226 "%s level=warning" % warn_name,
227 "%s level=info" % info_name,
228 "%s level=debug" % debug_name,
231 self.assertEqual(messages, [
232 "%s level=error" % error_name,
233 "%s level=warning" % warn_name,
234 "%s level=info" % info_name,
237 self.assertEqual(messages, [
242 def testThreadName(self):
243 thread_name = threading.currentThread().getName()
245 for enable_threadname in [False, True]:
246 logger = logging.Logger("TestLogger")
249 utils.SetupToolLogging(True, True, threadname=enable_threadname,
250 _root_logger=logger, _stream=buf)
252 logger.debug("test134042376")
254 lines = buf.getvalue().splitlines()
255 self.assertEqual(len(lines), 1)
257 if enable_threadname:
258 self.assertTrue((" %s " % thread_name) in lines[0])
260 self.assertTrue(thread_name not in lines[0])
263 if __name__ == "__main__":
264 testutils.GanetiTestProgram()