Server : Apache/2.4.18 (Ubuntu) System : Linux canvaswebdesign 3.13.0-71-generic #114-Ubuntu SMP Tue Dec 1 02:34:22 UTC 2015 x86_64 User : oppastar ( 1041) PHP Version : 7.0.33-0ubuntu0.16.04.15 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority, Directory : /usr/lib/python3/dist-packages/fail2ban/tests/ |
Upload File : |
# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: t -*- # vi: set ft=python sts=4 ts=4 sw=4 noet : # This file is part of Fail2Ban. # # Fail2Ban is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # Fail2Ban is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with Fail2Ban; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. __author__ = "Yaroslav Halchenko" __copyright__ = "Copyright (c) 2013 Yaroslav Halchenko" __license__ = "GPL" import logging import os import re import sys import time import unittest from io import StringIO from ..server.mytime import MyTime from ..helpers import getLogger logSys = getLogger(__name__) CONFIG_DIR = os.environ.get('FAIL2BAN_CONFIG_DIR', None) if not CONFIG_DIR: # Use heuristic to figure out where configuration files are if os.path.exists(os.path.join('config','fail2ban.conf')): CONFIG_DIR = 'config' else: CONFIG_DIR = '/etc/fail2ban' def mtimesleep(): # no sleep now should be necessary since polling tracks now not only # mtime but also ino and size pass old_TZ = os.environ.get('TZ', None) def setUpMyTime(): # Set the time to a fixed, known value # Sun Aug 14 12:00:00 CEST 2005 # yoh: we need to adjust TZ to match the one used by Cyril so all the timestamps match os.environ['TZ'] = 'Europe/Zurich' time.tzset() MyTime.setTime(1124013600) def tearDownMyTime(): os.environ.pop('TZ') if old_TZ: os.environ['TZ'] = old_TZ time.tzset() MyTime.myTime = None def gatherTests(regexps=None, no_network=False): # Import all the test cases here instead of a module level to # avoid circular imports from . import banmanagertestcase from . import clientreadertestcase from . import failmanagertestcase from . import filtertestcase from . import servertestcase from . import datedetectortestcase from . import actiontestcase from . import actionstestcase from . import sockettestcase from . import misctestcase from . import databasetestcase from . import samplestestcase if not regexps: # pragma: no cover tests = unittest.TestSuite() else: # pragma: no cover class FilteredTestSuite(unittest.TestSuite): _regexps = [re.compile(r) for r in regexps] def addTest(self, suite): suite_str = str(suite) for r in self._regexps: if r.search(suite_str): super(FilteredTestSuite, self).addTest(suite) return tests = FilteredTestSuite() # Server #tests.addTest(unittest.makeSuite(servertestcase.StartStop)) tests.addTest(unittest.makeSuite(servertestcase.Transmitter)) tests.addTest(unittest.makeSuite(servertestcase.JailTests)) tests.addTest(unittest.makeSuite(servertestcase.RegexTests)) tests.addTest(unittest.makeSuite(servertestcase.LoggingTests)) tests.addTest(unittest.makeSuite(actiontestcase.CommandActionTest)) tests.addTest(unittest.makeSuite(actionstestcase.ExecuteActions)) # FailManager tests.addTest(unittest.makeSuite(failmanagertestcase.AddFailure)) # BanManager tests.addTest(unittest.makeSuite(banmanagertestcase.AddFailure)) try: import dns tests.addTest(unittest.makeSuite(banmanagertestcase.StatusExtendedCymruInfo)) except ImportError: pass # ClientReaders tests.addTest(unittest.makeSuite(clientreadertestcase.ConfigReaderTest)) tests.addTest(unittest.makeSuite(clientreadertestcase.JailReaderTest)) tests.addTest(unittest.makeSuite(clientreadertestcase.FilterReaderTest)) tests.addTest(unittest.makeSuite(clientreadertestcase.JailsReaderTest)) tests.addTest(unittest.makeSuite(clientreadertestcase.JailsReaderTestCache)) # CSocket and AsyncServer tests.addTest(unittest.makeSuite(sockettestcase.Socket)) tests.addTest(unittest.makeSuite(sockettestcase.ClientMisc)) # Misc helpers tests.addTest(unittest.makeSuite(misctestcase.HelpersTest)) tests.addTest(unittest.makeSuite(misctestcase.SetupTest)) tests.addTest(unittest.makeSuite(misctestcase.TestsUtilsTest)) tests.addTest(unittest.makeSuite(misctestcase.CustomDateFormatsTest)) # Database tests.addTest(unittest.makeSuite(databasetestcase.DatabaseTest)) # Filter tests.addTest(unittest.makeSuite(filtertestcase.IgnoreIP)) tests.addTest(unittest.makeSuite(filtertestcase.BasicFilter)) tests.addTest(unittest.makeSuite(filtertestcase.LogFile)) tests.addTest(unittest.makeSuite(filtertestcase.LogFileMonitor)) tests.addTest(unittest.makeSuite(filtertestcase.LogFileFilterPoll)) if not no_network: tests.addTest(unittest.makeSuite(filtertestcase.IgnoreIPDNS)) tests.addTest(unittest.makeSuite(filtertestcase.GetFailures)) tests.addTest(unittest.makeSuite(filtertestcase.DNSUtilsTests)) tests.addTest(unittest.makeSuite(filtertestcase.JailTests)) # DateDetector tests.addTest(unittest.makeSuite(datedetectortestcase.DateDetectorTest)) # Filter Regex tests with sample logs tests.addTest(unittest.makeSuite(samplestestcase.FilterSamplesRegex)) # # Python action testcases # testloader = unittest.TestLoader() from . import action_d for file_ in os.listdir( os.path.abspath(os.path.dirname(action_d.__file__))): if file_.startswith("test_") and file_.endswith(".py"): if no_network and file_ in ['test_badips.py','test_smtp.py']: #pragma: no cover # Test required network continue tests.addTest(testloader.loadTestsFromName( "%s.%s" % (action_d.__name__, os.path.splitext(file_)[0]))) # # Extensive use-tests of different available filters backends # from ..server.filterpoll import FilterPoll filters = [FilterPoll] # always available # Additional filters available only if external modules are available # yoh: Since I do not know better way for parametric tests # with good old unittest try: from ..server.filtergamin import FilterGamin filters.append(FilterGamin) except Exception as e: # pragma: no cover logSys.warning("Skipping gamin backend testing. Got exception '%s'" % e) try: from ..server.filterpyinotify import FilterPyinotify filters.append(FilterPyinotify) except Exception as e: # pragma: no cover logSys.warning("I: Skipping pyinotify backend testing. Got exception '%s'" % e) for Filter_ in filters: tests.addTest(unittest.makeSuite( filtertestcase.get_monitor_failures_testcase(Filter_))) try: # pragma: systemd no cover from ..server.filtersystemd import FilterSystemd tests.addTest(unittest.makeSuite(filtertestcase.get_monitor_failures_journal_testcase(FilterSystemd))) except Exception as e: # pragma: no cover logSys.warning("I: Skipping systemd backend testing. Got exception '%s'" % e) # Server test for logging elements which break logging used to support # testcases analysis tests.addTest(unittest.makeSuite(servertestcase.TransmitterLogging)) return tests class LogCaptureTestCase(unittest.TestCase): def setUp(self): # For extended testing of what gets output into logging # system, we will redirect it to a string logSys = getLogger("fail2ban") # Keep old settings self._old_level = logSys.level self._old_handlers = logSys.handlers # Let's log everything into a string self._log = StringIO() logSys.handlers = [logging.StreamHandler(self._log)] if self._old_level < logging.DEBUG: # so if HEAVYDEBUG etc -- show them! logSys.handlers += self._old_handlers logSys.setLevel(getattr(logging, 'DEBUG')) def tearDown(self): """Call after every test case.""" # print "O: >>%s<<" % self._log.getvalue() logSys = getLogger("fail2ban") logSys.handlers = self._old_handlers logSys.level = self._old_level def _is_logged(self, s): return s in self._log.getvalue() def getLog(self): return self._log.getvalue() def printLog(self): print((self._log.getvalue())) # Solution from http://stackoverflow.com/questions/568271/how-to-check-if-there-exists-a-process-with-a-given-pid # under cc by-sa 3.0 if os.name == 'posix': def pid_exists(pid): """Check whether pid exists in the current process table.""" import errno if pid < 0: return False try: os.kill(pid, 0) except OSError as e: return e.errno == errno.EPERM else: return True else: def pid_exists(pid): import ctypes kernel32 = ctypes.windll.kernel32 SYNCHRONIZE = 0x100000 process = kernel32.OpenProcess(SYNCHRONIZE, 0, pid) if process != 0: kernel32.CloseHandle(process) return True else: return False