"""Integration tests for the checker submodule.""" import mock import pytest from flake8 import checker from flake8._compat import importlib_metadata from flake8.plugins import manager from flake8.processor import FileProcessor PHYSICAL_LINE = "# Physical line content" EXPECTED_REPORT = (1, 1, 'T000 Expected Message') EXPECTED_REPORT_PHYSICAL_LINE = (1, 'T000 Expected Message') EXPECTED_RESULT_PHYSICAL_LINE = ( 'T000', 0, 1, 'Expected Message', None, ) class PluginClass(object): """Simple file plugin class yielding the expected report.""" name = 'test' version = '1.0.0' def __init__(self, tree): """Construct a dummy object to provide mandatory parameter.""" pass def run(self): """Run class yielding one element containing the expected report.""" yield EXPECTED_REPORT + (type(self), ) def plugin_func(func): """Decorate file plugins which are implemented as functions.""" func.name = 'test' func.version = '1.0.0' return func @plugin_func def plugin_func_gen(tree): """Yield the expected report.""" yield EXPECTED_REPORT + (type(plugin_func_gen), ) @plugin_func def plugin_func_list(tree): """Return a list of expected reports.""" return [EXPECTED_REPORT + (type(plugin_func_list), )] @plugin_func def plugin_func_physical_ret(physical_line): """Expect report from a physical_line. Single return.""" return EXPECTED_REPORT_PHYSICAL_LINE @plugin_func def plugin_func_physical_none(physical_line): """Expect report from a physical_line. No results.""" return None @plugin_func def plugin_func_physical_list_single(physical_line): """Expect report from a physical_line. List of single result.""" return [EXPECTED_REPORT_PHYSICAL_LINE] @plugin_func def plugin_func_physical_list_multiple(physical_line): """Expect report from a physical_line. List of multiple results.""" return [EXPECTED_REPORT_PHYSICAL_LINE] * 2 @plugin_func def plugin_func_physical_gen_single(physical_line): """Expect report from a physical_line. Generator of single result.""" yield EXPECTED_REPORT_PHYSICAL_LINE @plugin_func def plugin_func_physical_gen_multiple(physical_line): """Expect report from a physical_line. Generator of multiple results.""" for _ in range(3): yield EXPECTED_REPORT_PHYSICAL_LINE def mock_file_checker_with_plugin(plugin_target): """Get a mock FileChecker class with plugin_target registered. Useful as a starting point for mocking reports/results. """ # Mock an entry point returning the plugin target entry_point = mock.Mock(spec=['load']) entry_point.name = plugin_target.name entry_point.load.return_value = plugin_target entry_point.value = 'mocked:value' # Load the checker plugins using the entry point mock with mock.patch.object( importlib_metadata, 'entry_points', return_value={'flake8.extension': [entry_point]}, ): checks = manager.Checkers() # Prevent it from reading lines from stdin or somewhere else with mock.patch('flake8.processor.FileProcessor.read_lines', return_value=['Line 1']): file_checker = checker.FileChecker( '-', checks.to_dictionary(), mock.MagicMock() ) return file_checker @pytest.mark.parametrize('plugin_target', [ PluginClass, plugin_func_gen, plugin_func_list, ]) def test_handle_file_plugins(plugin_target): """Test the FileChecker class handling different file plugin types.""" file_checker = mock_file_checker_with_plugin(plugin_target) # Do not actually build an AST file_checker.processor.build_ast = lambda: True # Forward reports to this mock report = mock.Mock() file_checker.report = report file_checker.run_ast_checks() report.assert_called_once_with(error_code=None, line_number=EXPECTED_REPORT[0], column=EXPECTED_REPORT[1], text=EXPECTED_REPORT[2]) @pytest.mark.parametrize('plugin_target,len_results', [ (plugin_func_physical_ret, 1), (plugin_func_physical_none, 0), (plugin_func_physical_list_single, 1), (plugin_func_physical_list_multiple, 2), (plugin_func_physical_gen_single, 1), (plugin_func_physical_gen_multiple, 3), ]) def test_line_check_results(plugin_target, len_results): """Test the FileChecker class handling results from line checks.""" file_checker = mock_file_checker_with_plugin(plugin_target) # Results will be stored in an internal array file_checker.run_physical_checks(PHYSICAL_LINE) expected = [EXPECTED_RESULT_PHYSICAL_LINE] * len_results assert file_checker.results == expected def test_logical_line_offset_out_of_bounds(): """Ensure that logical line offsets that are out of bounds do not crash.""" @plugin_func def _logical_line_out_of_bounds(logical_line): yield 10000, 'L100 test' file_checker = mock_file_checker_with_plugin(_logical_line_out_of_bounds) logical_ret = ( '', 'print("xxxxxxxxxxx")', [(0, (1, 0)), (5, (1, 5)), (6, (1, 6)), (19, (1, 19)), (20, (1, 20))], ) with mock.patch.object( FileProcessor, 'build_logical_line', return_value=logical_ret, ): file_checker.run_logical_checks() assert file_checker.results == [('L100', 0, 0, 'test', None)] PLACEHOLDER_CODE = 'some_line = "of" * code' @pytest.mark.parametrize('results, expected_order', [ # No entries should be added ([], []), # Results are correctly ordered ([('A101', 1, 1, 'placeholder error', PLACEHOLDER_CODE), ('A101', 2, 1, 'placeholder error', PLACEHOLDER_CODE)], [0, 1]), # Reversed order of lines ([('A101', 2, 1, 'placeholder error', PLACEHOLDER_CODE), ('A101', 1, 1, 'placeholder error', PLACEHOLDER_CODE)], [1, 0]), # Columns are not ordered correctly (when reports are ordered correctly) ([('A101', 1, 2, 'placeholder error', PLACEHOLDER_CODE), ('A101', 1, 1, 'placeholder error', PLACEHOLDER_CODE), ('A101', 2, 1, 'placeholder error', PLACEHOLDER_CODE)], [1, 0, 2]), ([('A101', 2, 1, 'placeholder error', PLACEHOLDER_CODE), ('A101', 1, 1, 'placeholder error', PLACEHOLDER_CODE), ('A101', 1, 2, 'placeholder error', PLACEHOLDER_CODE)], [1, 2, 0]), ([('A101', 1, 2, 'placeholder error', PLACEHOLDER_CODE), ('A101', 2, 2, 'placeholder error', PLACEHOLDER_CODE), ('A101', 2, 1, 'placeholder error', PLACEHOLDER_CODE)], [0, 2, 1]), ([('A101', 1, 3, 'placeholder error', PLACEHOLDER_CODE), ('A101', 2, 2, 'placeholder error', PLACEHOLDER_CODE), ('A101', 3, 1, 'placeholder error', PLACEHOLDER_CODE)], [0, 1, 2]), ([('A101', 1, 1, 'placeholder error', PLACEHOLDER_CODE), ('A101', 1, 3, 'placeholder error', PLACEHOLDER_CODE), ('A101', 2, 2, 'placeholder error', PLACEHOLDER_CODE)], [0, 1, 2]), # Previously sort column and message (so reversed) (see bug 196) ([('A101', 1, 1, 'placeholder error', PLACEHOLDER_CODE), ('A101', 2, 1, 'charlie error', PLACEHOLDER_CODE)], [0, 1]), ]) def test_report_order(results, expected_order): """ Test in which order the results will be reported. It gets a list of reports from the file checkers and verifies that the result will be ordered independent from the original report. """ def count_side_effect(name, sorted_results): """Side effect for the result handler to tell all are reported.""" return len(sorted_results) # To simplify the parameters (and prevent copy & pasting) reuse report # tuples to create the expected result lists from the indexes expected_results = [results[index] for index in expected_order] file_checker = mock.Mock(spec=['results', 'display_name']) file_checker.results = results file_checker.display_name = 'placeholder' style_guide = mock.MagicMock(spec=['options', 'processing_file']) # Create a placeholder manager without arguments or plugins # Just add one custom file checker which just provides the results manager = checker.Manager(style_guide, [], []) manager.checkers = manager._all_checkers = [file_checker] # _handle_results is the first place which gets the sorted result # Should something non-private be mocked instead? handler = mock.Mock(side_effect=count_side_effect) with mock.patch.object(manager, '_handle_results', handler): assert manager.report() == (len(results), len(results)) handler.assert_called_once_with('placeholder', expected_results) def test_acquire_when_multiprocessing_pool_can_initialize(): """Verify successful importing of hardware semaphore support. Mock the behaviour of a platform that has a hardware sem_open implementation, and then attempt to initialize a multiprocessing Pool object. This simulates the behaviour on most common platforms. """ with mock.patch("multiprocessing.Pool") as pool: result = checker._try_initialize_processpool(2) pool.assert_called_once_with(2, checker._pool_init) assert result is pool.return_value def test_acquire_when_multiprocessing_pool_can_not_initialize(): """Verify unsuccessful importing of hardware semaphore support. Mock the behaviour of a platform that has not got a hardware sem_open implementation, and then attempt to initialize a multiprocessing Pool object. This scenario will occur on platforms such as Termux and on some more exotic devices. https://github.com/python/cpython/blob/4e02981de0952f54bf87967f8e10d169d6946b40/Lib/multiprocessing/synchronize.py#L30-L33 """ with mock.patch("multiprocessing.Pool", side_effect=ImportError) as pool: result = checker._try_initialize_processpool(2) pool.assert_called_once_with(2, checker._pool_init) assert result is None