blob: 7afcfe0e4cae4b9792be7835375c88388012274b [file] [log] [blame]
# Licensed under the Apache License: http://www.apache.org/licenses/LICENSE-2.0
# For details: https://github.com/nedbat/coveragepy/blob/master/NOTICE.txt
"""Tests for coverage.data, and coverage.sqldata."""
from __future__ import annotations
import glob
import os
import os.path
import re
import sqlite3
import threading
from collections.abc import Collection, Iterable, Mapping
from typing import Any, Callable, TypeVar
from unittest import mock
import pytest
from coverage.data import CoverageData, combine_parallel_data
from coverage.data import add_data_to_hash, line_counts
from coverage.exceptions import DataError, NoDataError
from coverage.files import PathAliases, canonical_filename
from coverage.types import FilePathClasses, FilePathType, TArc, TLineNo
from tests.coveragetest import CoverageTest
from tests.helpers import DebugControlString, assert_count_equal
LINES_1 = {
"a.py": {1, 2},
"b.py": {3},
}
SUMMARY_1 = {"a.py": 2, "b.py": 1}
MEASURED_FILES_1 = ["a.py", "b.py"]
A_PY_LINES_1 = [1, 2]
B_PY_LINES_1 = [3]
LINES_2 = {
"a.py": {1, 5},
"c.py": {17},
}
SUMMARY_1_2 = {"a.py": 3, "b.py": 1, "c.py": 1}
MEASURED_FILES_1_2 = ["a.py", "b.py", "c.py"]
ARCS_3 = {
"x.py": {(-1, 1), (1, 2), (2, 3), (3, -1)},
"y.py": {(-1, 17), (17, 23), (23, -1)},
}
X_PY_ARCS_3 = [(-1, 1), (1, 2), (2, 3), (3, -1)]
Y_PY_ARCS_3 = [(-1, 17), (17, 23), (23, -1)]
SUMMARY_3 = {"x.py": 3, "y.py": 2}
MEASURED_FILES_3 = ["x.py", "y.py"]
X_PY_LINES_3 = [1, 2, 3]
Y_PY_LINES_3 = [17, 23]
ARCS_4 = {
"x.py": {(-1, 2), (2, 5), (5, -1)},
"z.py": {(-1, 1000), (1000, -1)},
}
SUMMARY_3_4 = {"x.py": 4, "y.py": 2, "z.py": 1}
MEASURED_FILES_3_4 = ["x.py", "y.py", "z.py"]
def DebugCoverageData(*args: Any, **kwargs: Any) -> CoverageData:
"""Factory for CovergeData instances with debugging turned on.
This lets us exercise the debugging lines in sqldata.py. We don't make
any assertions about the debug output, but at least we can know that they
execute successfully, and they won't be marked as distracting missing
lines in our coverage reports.
In the tests in this file, we usually use DebugCoverageData, but sometimes
a plain CoverageData, and some tests are parameterized to run once with each
so that we have a mix of debugging or not.
"""
assert "debug" not in kwargs
options = ["dataio", "dataop", "sql"]
if kwargs:
# There's no logical reason kwargs should imply sqldata debugging.
# This is just a way to get a mix of debug options across the tests.
options.extend(["dataop2", "sqldata"])
debug = DebugControlString(options=options)
return CoverageData(*args, debug=debug, **kwargs) # type: ignore[misc]
TCoverageData = Callable[..., CoverageData]
def assert_line_counts(
covdata: CoverageData,
counts: Mapping[str, int],
fullpath: bool = False,
) -> None:
"""Check that the line_counts of `covdata` is `counts`."""
assert line_counts(covdata, fullpath) == counts
def assert_measured_files(covdata: CoverageData, measured: Iterable[str]) -> None:
"""Check that `covdata`'s measured files are `measured`."""
assert_count_equal(covdata.measured_files(), measured)
def assert_lines1_data(covdata: CoverageData) -> None:
"""Check that `covdata` has the data from LINES1."""
assert_line_counts(covdata, SUMMARY_1)
assert_measured_files(covdata, MEASURED_FILES_1)
assert_count_equal(covdata.lines("a.py"), A_PY_LINES_1)
assert not covdata.has_arcs()
def assert_arcs3_data(covdata: CoverageData) -> None:
"""Check that `covdata` has the data from ARCS3."""
assert_line_counts(covdata, SUMMARY_3)
assert_measured_files(covdata, MEASURED_FILES_3)
assert_count_equal(covdata.lines("x.py"), X_PY_LINES_3)
assert_count_equal(covdata.arcs("x.py"), X_PY_ARCS_3)
assert_count_equal(covdata.lines("y.py"), Y_PY_LINES_3)
assert_count_equal(covdata.arcs("y.py"), Y_PY_ARCS_3)
assert covdata.has_arcs()
TData = TypeVar("TData", bound=TLineNo | TArc)
def dicts_from_sets(file_data: dict[str, set[TData]]) -> dict[str, dict[TData, None]]:
"""Convert a dict of sets into a dict of dicts.
Before 6.0, file data was a dict with None as the values. In 6.0, file
data is a set. SqlData all along only cared that it was an iterable.
This function helps us test that the old dict format still works.
"""
return {k: dict.fromkeys(v) for k, v in file_data.items()}
class CoverageDataTest(CoverageTest):
"""Test cases for CoverageData."""
def test_empty_data_is_false(self) -> None:
covdata = DebugCoverageData()
assert not covdata
self.assert_doesnt_exist(".coverage")
def test_empty_data_is_false_when_read(self) -> None:
covdata = DebugCoverageData()
covdata.read()
assert not covdata
self.assert_doesnt_exist(".coverage")
def test_line_data_is_true(self) -> None:
covdata = DebugCoverageData()
covdata.add_lines(LINES_1)
assert covdata
def test_arc_data_is_true(self) -> None:
covdata = DebugCoverageData()
covdata.add_arcs(ARCS_3)
assert covdata
def test_empty_line_data_is_false(self) -> None:
covdata = DebugCoverageData()
covdata.add_lines({})
assert not covdata
def test_empty_arc_data_is_false(self) -> None:
covdata = DebugCoverageData()
covdata.add_arcs({})
assert not covdata
@pytest.mark.parametrize("lines", [LINES_1, dicts_from_sets(LINES_1)])
def test_adding_lines(self, lines: Mapping[str, Collection[TLineNo]]) -> None:
covdata = DebugCoverageData()
covdata.add_lines(lines)
assert_lines1_data(covdata)
@pytest.mark.parametrize("arcs", [ARCS_3, dicts_from_sets(ARCS_3)])
def test_adding_arcs(self, arcs: Mapping[str, Collection[TArc]]) -> None:
covdata = DebugCoverageData()
covdata.add_arcs(arcs)
assert_arcs3_data(covdata)
def test_ok_to_add_lines_twice(self) -> None:
covdata = DebugCoverageData()
covdata.add_lines(LINES_1)
covdata.add_lines(LINES_2)
assert_line_counts(covdata, SUMMARY_1_2)
assert_measured_files(covdata, MEASURED_FILES_1_2)
def test_ok_to_add_arcs_twice(self) -> None:
covdata = DebugCoverageData()
covdata.add_arcs(ARCS_3)
covdata.add_arcs(ARCS_4)
assert_line_counts(covdata, SUMMARY_3_4)
assert_measured_files(covdata, MEASURED_FILES_3_4)
def test_ok_to_add_empty_arcs(self) -> None:
covdata = DebugCoverageData()
covdata.add_arcs(ARCS_3)
covdata.add_arcs(ARCS_4)
covdata.add_arcs(dict.fromkeys(ARCS_3, set()))
assert_line_counts(covdata, SUMMARY_3_4)
assert_measured_files(covdata, MEASURED_FILES_3_4)
@pytest.mark.parametrize("klass", [CoverageData, DebugCoverageData])
def test_cant_add_arcs_with_lines(self, klass: TCoverageData) -> None:
covdata = klass()
covdata.add_lines(LINES_1)
msg = "Can't add branch measurements to existing line data"
with pytest.raises(DataError, match=msg):
covdata.add_arcs(ARCS_3)
@pytest.mark.parametrize("klass", [CoverageData, DebugCoverageData])
def test_cant_add_lines_with_arcs(self, klass: TCoverageData) -> None:
covdata = klass()
covdata.add_arcs(ARCS_3)
msg = "Can't add line measurements to existing branch data"
with pytest.raises(DataError, match=msg):
covdata.add_lines(LINES_1)
def test_touch_file_with_lines(self) -> None:
covdata = DebugCoverageData()
covdata.add_lines(LINES_1)
covdata.touch_file("zzz.py")
assert_measured_files(covdata, MEASURED_FILES_1 + ["zzz.py"])
def test_touch_file_with_arcs(self) -> None:
covdata = DebugCoverageData()
covdata.add_arcs(ARCS_3)
covdata.touch_file("zzz.py")
assert_measured_files(covdata, MEASURED_FILES_3 + ["zzz.py"])
def test_set_query_contexts(self) -> None:
covdata = DebugCoverageData()
covdata.set_context("test_a")
covdata.add_lines(LINES_1)
covdata.set_query_contexts(["te.*a"])
assert covdata.lines("a.py") == [1, 2]
covdata.set_query_contexts(["other"])
assert covdata.lines("a.py") == []
def test_no_lines_vs_unmeasured_file(self) -> None:
covdata = DebugCoverageData()
covdata.add_lines(LINES_1)
covdata.touch_file("zzz.py")
assert covdata.lines("zzz.py") == []
assert covdata.lines("no_such_file.py") is None
def test_lines_with_contexts(self) -> None:
covdata = DebugCoverageData()
covdata.set_context("test_a")
covdata.add_lines(LINES_1)
assert covdata.lines("a.py") == [1, 2]
covdata.set_query_contexts(["test"])
assert covdata.lines("a.py") == [1, 2]
covdata.set_query_contexts(["other"])
assert covdata.lines("a.py") == []
def test_contexts_by_lineno_with_lines(self) -> None:
covdata = DebugCoverageData()
covdata.set_context("test_a")
covdata.add_lines(LINES_1)
expected = {1: ["test_a"], 2: ["test_a"]}
assert covdata.contexts_by_lineno("a.py") == expected
@pytest.mark.parametrize("lines", [LINES_1, dicts_from_sets(LINES_1)])
def test_no_duplicate_lines(self, lines: Mapping[str, Collection[TLineNo]]) -> None:
covdata = DebugCoverageData()
covdata.set_context("context1")
covdata.add_lines(lines)
covdata.set_context("context2")
covdata.add_lines(lines)
assert covdata.lines("a.py") == A_PY_LINES_1
@pytest.mark.parametrize("arcs", [ARCS_3, dicts_from_sets(ARCS_3)])
def test_no_duplicate_arcs(self, arcs: Mapping[str, Collection[TArc]]) -> None:
covdata = DebugCoverageData()
covdata.set_context("context1")
covdata.add_arcs(arcs)
covdata.set_context("context2")
covdata.add_arcs(arcs)
assert covdata.arcs("x.py") == X_PY_ARCS_3
def test_no_arcs_vs_unmeasured_file(self) -> None:
covdata = DebugCoverageData()
covdata.add_arcs(ARCS_3)
covdata.touch_file("zzz.py")
assert covdata.lines("zzz.py") == []
assert covdata.lines("no_such_file.py") is None
assert covdata.arcs("zzz.py") == []
assert covdata.arcs("no_such_file.py") is None
def test_arcs_with_contexts(self) -> None:
covdata = DebugCoverageData()
covdata.set_context("test_x")
covdata.add_arcs(ARCS_3)
assert covdata.arcs("x.py") == [(-1, 1), (1, 2), (2, 3), (3, -1)]
covdata.set_query_contexts(["test_.$"])
assert covdata.arcs("x.py") == [(-1, 1), (1, 2), (2, 3), (3, -1)]
covdata.set_query_contexts(["other"])
assert covdata.arcs("x.py") == []
def test_contexts_by_lineno_with_arcs(self) -> None:
covdata = DebugCoverageData()
covdata.set_context("test_x")
covdata.add_arcs(ARCS_3)
expected = {1: ["test_x"], 2: ["test_x"], 3: ["test_x"]}
assert covdata.contexts_by_lineno("x.py") == expected
def test_contexts_by_lineno_with_unknown_file(self) -> None:
covdata = DebugCoverageData()
covdata.set_context("test_x")
covdata.add_arcs(ARCS_3)
assert covdata.contexts_by_lineno("xyz.py") == {}
def test_context_by_lineno_with_query_contexts_with_lines(self) -> None:
covdata = DebugCoverageData()
covdata.set_context("test_1")
covdata.add_lines(LINES_1)
covdata.set_context("test_2")
covdata.add_lines(LINES_2)
covdata.set_query_context("test_1")
assert covdata.contexts_by_lineno("a.py") == dict.fromkeys([1, 2], ["test_1"])
def test_context_by_lineno_with_query_contexts_with_arcs(self) -> None:
covdata = DebugCoverageData()
covdata.set_context("test_1")
covdata.add_arcs(ARCS_3)
covdata.set_context("test_2")
covdata.add_arcs(ARCS_4)
covdata.set_query_context("test_1")
assert covdata.contexts_by_lineno("x.py") == dict.fromkeys([1, 2, 3], ["test_1"])
def test_file_tracer_name(self) -> None:
covdata = DebugCoverageData()
covdata.add_lines(
{
"p1.foo": [1, 2, 3],
"p2.html": [10, 11, 12],
"main.py": [20],
}
)
covdata.add_file_tracers({"p1.foo": "p1.plugin", "p2.html": "p2.plugin"})
assert covdata.file_tracer("p1.foo") == "p1.plugin"
assert covdata.file_tracer("p2.html") == "p2.plugin"
assert covdata.file_tracer("main.py") == ""
assert covdata.file_tracer("p3.not_here") is None
def test_ok_to_repeat_file_tracer(self) -> None:
covdata = DebugCoverageData()
covdata.add_lines(
{
"p1.foo": [1, 2, 3],
"p2.html": [10, 11, 12],
}
)
covdata.add_file_tracers({"p1.foo": "p1.plugin", "p2.html": "p2.plugin"})
covdata.add_file_tracers({"p1.foo": "p1.plugin"})
assert covdata.file_tracer("p1.foo") == "p1.plugin"
def test_ok_to_set_empty_file_tracer(self) -> None:
covdata = DebugCoverageData()
covdata.add_lines(
{
"p1.foo": [1, 2, 3],
"p2.html": [10, 11, 12],
"main.py": [20],
}
)
covdata.add_file_tracers({"p1.foo": "p1.plugin", "main.py": ""})
assert covdata.file_tracer("p1.foo") == "p1.plugin"
assert covdata.file_tracer("main.py") == ""
def test_cant_change_file_tracer_name(self) -> None:
covdata = DebugCoverageData()
covdata.add_lines({"p1.foo": [1, 2, 3]})
covdata.add_file_tracers({"p1.foo": "p1.plugin"})
msg = "Conflicting file tracer name for 'p1.foo': 'p1.plugin' vs 'p1.plugin.foo'"
with pytest.raises(DataError, match=msg):
covdata.add_file_tracers({"p1.foo": "p1.plugin.foo"})
def test_update_lines(self) -> None:
covdata1 = DebugCoverageData(suffix="1")
covdata1.add_lines(LINES_1)
covdata2 = DebugCoverageData(suffix="2")
covdata2.add_lines(LINES_2)
covdata3 = DebugCoverageData(suffix="3")
covdata3.update(covdata1)
covdata3.update(covdata2)
assert_line_counts(covdata3, SUMMARY_1_2)
assert_measured_files(covdata3, MEASURED_FILES_1_2)
def test_update_arcs(self) -> None:
covdata1 = DebugCoverageData(suffix="1")
covdata1.add_arcs(ARCS_3)
covdata2 = DebugCoverageData(suffix="2")
covdata2.add_arcs(ARCS_4)
covdata3 = DebugCoverageData(suffix="3")
covdata3.update(covdata1)
covdata3.update(covdata2)
assert_line_counts(covdata3, SUMMARY_3_4)
assert_measured_files(covdata3, MEASURED_FILES_3_4)
def test_update_cant_mix_lines_and_arcs(self) -> None:
covdata1 = DebugCoverageData(suffix="1")
covdata1.add_lines(LINES_1)
covdata2 = DebugCoverageData(suffix="2")
covdata2.add_arcs(ARCS_3)
msg = "Can't combine branch coverage data with statement data"
with pytest.raises(DataError, match=msg):
covdata1.update(covdata2)
msg = "Can't combine statement coverage data with branch data"
with pytest.raises(DataError, match=msg):
covdata2.update(covdata1)
def test_update_file_tracers(self) -> None:
covdata1 = DebugCoverageData(suffix="1")
covdata1.add_lines(
{
"p1.html": [1, 2, 3, 4],
"p2.html": [5, 6, 7],
"main.py": [10, 11, 12],
}
)
covdata1.add_file_tracers(
{
"p1.html": "html.plugin",
"p2.html": "html.plugin2",
}
)
covdata2 = DebugCoverageData(suffix="2")
covdata2.add_lines(
{
"p1.html": [3, 4, 5, 6],
"p2.html": [7, 8, 9],
"p3.foo": [1000, 1001],
"main.py": [10, 11, 12],
}
)
covdata2.add_file_tracers(
{
"p1.html": "html.plugin",
"p2.html": "html.plugin2",
"p3.foo": "foo_plugin",
}
)
covdata3 = DebugCoverageData(suffix="3")
covdata3.update(covdata1)
covdata3.update(covdata2)
assert covdata3.file_tracer("p1.html") == "html.plugin"
assert covdata3.file_tracer("p2.html") == "html.plugin2"
assert covdata3.file_tracer("p3.foo") == "foo_plugin"
assert covdata3.file_tracer("main.py") == ""
def test_update_conflicting_file_tracers(self) -> None:
covdata1 = DebugCoverageData(suffix="1")
covdata1.add_lines({"p1.html": [1, 2, 3]})
covdata1.add_file_tracers({"p1.html": "html.plugin"})
covdata2 = DebugCoverageData(suffix="2")
covdata2.add_lines({"p1.html": [1, 2, 3]})
covdata2.add_file_tracers({"p1.html": "html.other_plugin"})
msg = "Conflicting file tracer name for 'p1.html': 'html.plugin' vs 'html.other_plugin'"
with pytest.raises(DataError, match=msg):
covdata1.update(covdata2)
msg = "Conflicting file tracer name for 'p1.html': 'html.other_plugin' vs 'html.plugin'"
with pytest.raises(DataError, match=msg):
covdata2.update(covdata1)
def test_update_file_tracer_vs_no_file_tracer(self) -> None:
covdata1 = DebugCoverageData(suffix="1")
covdata1.add_lines({"p1.html": [1, 2, 3]})
covdata1.add_file_tracers({"p1.html": "html.plugin"})
covdata2 = DebugCoverageData(suffix="2")
covdata2.add_lines({"p1.html": [1, 2, 3]})
msg = "Conflicting file tracer name for 'p1.html': 'html.plugin' vs ''"
with pytest.raises(DataError, match=msg):
covdata1.update(covdata2)
msg = "Conflicting file tracer name for 'p1.html': '' vs 'html.plugin'"
with pytest.raises(DataError, match=msg):
covdata2.update(covdata1)
def test_update_lines_empty(self) -> None:
covdata1 = DebugCoverageData(suffix="1")
covdata1.add_lines(LINES_1)
covdata2 = DebugCoverageData(suffix="2")
covdata1.update(covdata2)
assert_line_counts(covdata1, SUMMARY_1)
def test_update_arcs_empty(self) -> None:
covdata1 = DebugCoverageData(suffix="1")
covdata1.add_arcs(ARCS_3)
covdata2 = DebugCoverageData(suffix="2")
covdata1.update(covdata2)
assert_line_counts(covdata1, SUMMARY_3)
def test_asking_isnt_measuring(self) -> None:
# Asking about an unmeasured file shouldn't make it seem measured.
covdata = DebugCoverageData()
assert_measured_files(covdata, [])
assert covdata.arcs("missing.py") is None
assert_measured_files(covdata, [])
def test_add_to_hash_with_lines(self) -> None:
covdata = DebugCoverageData()
covdata.add_lines(LINES_1)
hasher = mock.Mock()
add_data_to_hash(covdata, "a.py", hasher)
assert hasher.method_calls == [
mock.call.update([1, 2]), # lines
mock.call.update(""), # file_tracer name
]
def test_add_to_hash_with_arcs(self) -> None:
covdata = DebugCoverageData()
covdata.add_arcs(ARCS_3)
covdata.add_file_tracers({"y.py": "hologram_plugin"})
hasher = mock.Mock()
add_data_to_hash(covdata, "y.py", hasher)
assert hasher.method_calls == [
mock.call.update([(-1, 17), (17, 23), (23, -1)]), # arcs
mock.call.update("hologram_plugin"), # file_tracer name
]
def test_add_to_lines_hash_with_missing_file(self) -> None:
# https://github.com/nedbat/coveragepy/issues/403
covdata = DebugCoverageData()
covdata.add_lines(LINES_1)
hasher = mock.Mock()
add_data_to_hash(covdata, "missing.py", hasher)
assert hasher.method_calls == [
mock.call.update([]),
mock.call.update(None),
]
def test_add_to_arcs_hash_with_missing_file(self) -> None:
# https://github.com/nedbat/coveragepy/issues/403
covdata = DebugCoverageData()
covdata.add_arcs(ARCS_3)
covdata.add_file_tracers({"y.py": "hologram_plugin"})
hasher = mock.Mock()
add_data_to_hash(covdata, "missing.py", hasher)
assert hasher.method_calls == [
mock.call.update([]),
mock.call.update(None),
]
def test_empty_lines_are_still_lines(self) -> None:
covdata = DebugCoverageData()
covdata.add_lines({})
covdata.touch_file("abc.py")
assert not covdata.has_arcs()
def test_empty_arcs_are_still_arcs(self) -> None:
covdata = DebugCoverageData()
covdata.add_arcs({})
covdata.touch_file("abc.py")
assert covdata.has_arcs()
def test_cant_touch_in_empty_data(self) -> None:
covdata = DebugCoverageData()
msg = "Can't touch files in an empty CoverageData"
with pytest.raises(DataError, match=msg):
covdata.touch_file("abc.py")
def test_read_and_write_are_opposites(self) -> None:
covdata1 = DebugCoverageData()
covdata1.add_arcs(ARCS_3)
covdata1.write()
covdata2 = DebugCoverageData()
covdata2.read()
assert_arcs3_data(covdata2)
def test_thread_stress(self) -> None:
covdata = DebugCoverageData()
exceptions = []
def thread_main() -> None:
"""Every thread will try to add the same data."""
try:
covdata.add_lines(LINES_1)
except Exception as ex: # pragma: only failure
exceptions.append(ex)
threads = [threading.Thread(target=thread_main) for _ in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()
assert_lines1_data(covdata)
assert not exceptions
def test_purge_files_lines(self) -> None:
covdata = DebugCoverageData()
covdata.add_lines(LINES_1)
covdata.add_lines(LINES_2)
assert_line_counts(covdata, SUMMARY_1_2)
covdata.purge_files(["a.py", "b.py"])
assert_line_counts(covdata, {"a.py": 0, "b.py": 0, "c.py": 1})
covdata.purge_files(["c.py"])
assert_line_counts(covdata, {"a.py": 0, "b.py": 0, "c.py": 0})
# It's OK to "purge" a file that wasn't measured.
covdata.purge_files(["xyz.py"])
assert_line_counts(covdata, {"a.py": 0, "b.py": 0, "c.py": 0})
def test_purge_files_arcs(self) -> None:
covdata = CoverageData()
covdata.add_arcs(ARCS_3)
covdata.add_arcs(ARCS_4)
assert_line_counts(covdata, SUMMARY_3_4)
covdata.purge_files(["x.py", "y.py"])
assert_line_counts(covdata, {"x.py": 0, "y.py": 0, "z.py": 1})
covdata.purge_files(["z.py"])
assert_line_counts(covdata, {"x.py": 0, "y.py": 0, "z.py": 0})
def test_cant_purge_in_empty_data(self) -> None:
covdata = DebugCoverageData()
msg = "Can't purge files in an empty CoverageData"
with pytest.raises(DataError, match=msg):
covdata.purge_files(["abc.py"])
class CoverageDataInTempDirTest(CoverageTest):
"""Tests of CoverageData that need a temporary directory to make files."""
@pytest.mark.parametrize("file_class", FilePathClasses)
def test_read_write_lines(self, file_class: FilePathType) -> None:
self.assert_doesnt_exist("lines.dat")
covdata1 = DebugCoverageData(file_class("lines.dat"))
covdata1.add_lines(LINES_1)
covdata1.write()
self.assert_exists("lines.dat")
covdata2 = DebugCoverageData("lines.dat")
covdata2.read()
assert_lines1_data(covdata2)
def test_read_write_arcs(self) -> None:
covdata1 = DebugCoverageData("arcs.dat")
covdata1.add_arcs(ARCS_3)
covdata1.write()
covdata2 = DebugCoverageData("arcs.dat")
covdata2.read()
assert_arcs3_data(covdata2)
def test_read_errors(self) -> None:
self.make_file("xyzzy.dat", "xyzzy")
with pytest.raises(DataError, match=r"Couldn't .* '.*[/\\]xyzzy.dat': \S+"):
covdata = DebugCoverageData("xyzzy.dat")
covdata.read()
assert not covdata
def test_hard_read_error(self) -> None:
self.make_file("noperms.dat", "go away")
os.chmod("noperms.dat", 0)
with pytest.raises(DataError, match=r"Couldn't .* '.*[/\\]noperms.dat': \S+"):
covdata = DebugCoverageData("noperms.dat")
covdata.read()
@pytest.mark.parametrize("klass", [CoverageData, DebugCoverageData])
def test_error_when_closing(self, klass: TCoverageData) -> None:
msg = r"Couldn't .* '.*[/\\]flaked.dat': \S+"
with pytest.raises(DataError, match=msg):
covdata = klass("flaked.dat")
covdata.add_lines(LINES_1)
# I don't know how to make a real error, so let's fake one.
sqldb = list(covdata._dbs.values())[0]
sqldb.close = lambda: 1 / 0 # type: ignore
covdata.add_lines(LINES_1)
def test_wrong_schema_version(self) -> None:
with sqlite3.connect("wrong_schema.db") as con:
con.execute("create table coverage_schema (version integer)")
con.execute("insert into coverage_schema (version) values (99)")
msg = r"Couldn't .* '.*[/\\]wrong_schema.db': wrong schema: 99 instead of \d+"
with pytest.raises(DataError, match=msg):
covdata = DebugCoverageData("wrong_schema.db")
covdata.read()
assert not covdata
def test_wrong_schema_schema(self) -> None:
with sqlite3.connect("wrong_schema_schema.db") as con:
con.execute("create table coverage_schema (xyzzy integer)")
con.execute("insert into coverage_schema (xyzzy) values (99)")
msg = r"Data file .* doesn't seem to be a coverage data file: .* no such column"
with pytest.raises(DataError, match=msg):
covdata = DebugCoverageData("wrong_schema_schema.db")
covdata.read()
assert not covdata
class CoverageDataFilesTest(CoverageTest):
"""Tests of CoverageData file handling."""
def test_reading_missing(self) -> None:
self.assert_doesnt_exist(".coverage")
covdata = DebugCoverageData()
covdata.read()
assert_line_counts(covdata, {})
def test_writing_and_reading(self) -> None:
covdata1 = DebugCoverageData()
covdata1.add_lines(LINES_1)
covdata1.write()
covdata2 = DebugCoverageData()
covdata2.read()
assert_line_counts(covdata2, SUMMARY_1)
def test_debug_output_with_debug_option(self) -> None:
# With debug option dataio, we get debug output about reading and
# writing files.
debug = DebugControlString(options=["dataio"])
covdata1 = CoverageData(debug=debug)
covdata1.add_lines(LINES_1)
covdata1.write()
covdata2 = CoverageData(debug=debug)
covdata2.read()
assert_line_counts(covdata2, SUMMARY_1)
print(debug.get_output())
assert re.search(
r"^"
+ r"Closing dbs, force=False: {}\n"
+ r"Erasing data file '.*\.coverage' \(does not exist\)\n"
+ r"Opening data file '.*\.coverage' \(does not exist\)\n"
+ r"Initing data file '.*\.coverage' \(0 bytes, modified [-:. 0-9]+\)\n"
+ r"Writing \(no-op\) data file '.*\.coverage' \(\d+ bytes, modified [-:. 0-9]+\)\n"
+ r"Opening data file '.*\.coverage' \(\d+ bytes, modified [-:. 0-9]+\)\n"
+ r"$",
debug.get_output(),
)
def test_debug_output_without_debug_option(self) -> None:
# With a debug object, but not the dataio option, we don't get debug
# output.
debug = DebugControlString(options=[])
covdata1 = CoverageData(debug=debug)
covdata1.add_lines(LINES_1)
covdata1.write()
covdata2 = CoverageData(debug=debug)
covdata2.read()
assert_line_counts(covdata2, SUMMARY_1)
assert debug.get_output() == ""
def test_explicit_suffix(self) -> None:
self.assert_doesnt_exist(".coverage.SUFFIX")
covdata = DebugCoverageData(suffix="SUFFIX")
covdata.add_lines(LINES_1)
covdata.write()
self.assert_exists(".coverage.SUFFIX")
self.assert_doesnt_exist(".coverage")
def test_true_suffix(self) -> None:
self.assert_file_count(".coverage.*", 0)
# suffix=True will make a randomly named data file.
covdata1 = DebugCoverageData(suffix=True)
covdata1.add_lines(LINES_1)
covdata1.write()
self.assert_doesnt_exist(".coverage")
data_files1 = glob.glob(".coverage.*")
assert len(data_files1) == 1
# Another suffix=True will choose a different name.
covdata2 = DebugCoverageData(suffix=True)
covdata2.add_lines(LINES_1)
covdata2.write()
self.assert_doesnt_exist(".coverage")
data_files2 = glob.glob(".coverage.*")
assert len(data_files2) == 2
# In addition to being different, the suffixes have the pid in them.
assert all(str(os.getpid()) in fn for fn in data_files2)
def test_combining(self) -> None:
self.assert_file_count(".coverage.*", 0)
covdata1 = DebugCoverageData(suffix="1")
covdata1.add_lines(LINES_1)
covdata1.write()
self.assert_exists(".coverage.1")
self.assert_file_count(".coverage.*", 1)
covdata2 = DebugCoverageData(suffix="2")
covdata2.add_lines(LINES_2)
covdata2.write()
self.assert_exists(".coverage.2")
self.assert_file_count(".coverage.*", 2)
covdata3 = DebugCoverageData()
combine_parallel_data(covdata3)
assert_line_counts(covdata3, SUMMARY_1_2)
assert_measured_files(covdata3, MEASURED_FILES_1_2)
self.assert_file_count(".coverage.*", 0)
def test_erasing(self) -> None:
covdata1 = DebugCoverageData()
covdata1.add_lines(LINES_1)
covdata1.write()
covdata1.erase()
assert_line_counts(covdata1, {})
covdata2 = DebugCoverageData()
covdata2.read()
assert_line_counts(covdata2, {})
def test_erasing_parallel(self) -> None:
self.make_file("datafile.1")
self.make_file("datafile.2")
self.make_file(".coverage")
data = DebugCoverageData("datafile")
data.erase(parallel=True)
self.assert_file_count("datafile.*", 0)
self.assert_exists(".coverage")
def test_combining_with_aliases(self) -> None:
covdata1 = DebugCoverageData(suffix="1")
covdata1.add_lines(
{
"/home/ned/proj/src/a.py": {1, 2},
"/home/ned/proj/src/sub/b.py": {3},
"/home/ned/proj/src/template.html": {10},
}
)
covdata1.add_file_tracers(
{
"/home/ned/proj/src/template.html": "html.plugin",
}
)
covdata1.write()
covdata2 = DebugCoverageData(suffix="2")
covdata2.add_lines(
{
r"c:\ned\test\a.py": {4, 5},
r"c:\ned\test\sub\b.py": {3, 6},
}
)
covdata2.write()
self.assert_file_count(".coverage.*", 2)
self.make_file("a.py", "")
self.make_file("sub/b.py", "")
self.make_file("template.html", "")
covdata3 = DebugCoverageData()
aliases = PathAliases()
aliases.add("/home/ned/proj/src/", "./")
aliases.add(r"c:\ned\test", "./")
combine_parallel_data(covdata3, aliases=aliases)
self.assert_file_count(".coverage.*", 0)
self.assert_exists(".coverage")
apy = canonical_filename("./a.py")
sub_bpy = canonical_filename("./sub/b.py")
template_html = canonical_filename("./template.html")
assert_line_counts(covdata3, {apy: 4, sub_bpy: 2, template_html: 1}, fullpath=True)
assert_measured_files(covdata3, [apy, sub_bpy, template_html])
assert covdata3.file_tracer(template_html) == "html.plugin"
def test_combining_from_different_directories(self) -> None:
os.makedirs("cov1")
covdata1 = DebugCoverageData("cov1/.coverage.1")
covdata1.add_lines(LINES_1)
covdata1.write()
os.makedirs("cov2")
covdata2 = DebugCoverageData("cov2/.coverage.2")
covdata2.add_lines(LINES_2)
covdata2.write()
# This data won't be included.
covdata_xxx = DebugCoverageData(".coverage.xxx")
covdata_xxx.add_arcs(ARCS_3)
covdata_xxx.write()
covdata3 = DebugCoverageData()
combine_parallel_data(covdata3, data_paths=["cov1", "cov2"])
assert_line_counts(covdata3, SUMMARY_1_2)
assert_measured_files(covdata3, MEASURED_FILES_1_2)
self.assert_doesnt_exist("cov1/.coverage.1")
self.assert_doesnt_exist("cov2/.coverage.2")
self.assert_exists(".coverage.xxx")
def test_combining_from_files(self) -> None:
os.makedirs("cov1")
covdata1 = DebugCoverageData("cov1/.coverage.1")
covdata1.add_lines(LINES_1)
covdata1.write()
# Journal files should never be included in the combining.
self.make_file("cov1/.coverage.1-journal", "xyzzy")
os.makedirs("cov2")
covdata2 = DebugCoverageData("cov2/.coverage.2")
covdata2.add_lines(LINES_2)
covdata2.write()
# This data won't be included.
covdata_xxx = DebugCoverageData(".coverage.xxx")
covdata_xxx.add_arcs(ARCS_3)
covdata_xxx.write()
covdata_2xxx = DebugCoverageData("cov2/.coverage.xxx")
covdata_2xxx.add_arcs(ARCS_3)
covdata_2xxx.write()
covdata3 = DebugCoverageData()
combine_parallel_data(covdata3, data_paths=["cov1", "cov2/.coverage.2"])
assert_line_counts(covdata3, SUMMARY_1_2)
assert_measured_files(covdata3, MEASURED_FILES_1_2)
self.assert_doesnt_exist("cov1/.coverage.1")
self.assert_doesnt_exist("cov2/.coverage.2")
self.assert_exists(".coverage.xxx")
self.assert_exists("cov2/.coverage.xxx")
def test_combining_from_nonexistent_directories(self) -> None:
covdata = DebugCoverageData()
msg = "Couldn't combine from non-existent path 'xyzzy'"
with pytest.raises(NoDataError, match=msg):
combine_parallel_data(covdata, data_paths=["xyzzy"])
def test_interleaved_erasing_bug716(self) -> None:
# pytest-cov could produce this scenario. #716
covdata1 = DebugCoverageData()
covdata2 = DebugCoverageData()
# this used to create the .coverage database file..
covdata2.set_context("")
# then this would erase it all..
covdata1.erase()
# then this would try to use tables that no longer exist.
# "no such table: meta"
covdata2.add_lines(LINES_1)
@pytest.mark.parametrize(
"dpart, fpart",
[
("", "[b-a]"),
("[3-1]", ""),
("[3-1]", "[b-a]"),
],
)
def test_combining_with_crazy_filename(self, dpart: str, fpart: str) -> None:
dirname = f"py{dpart}"
basename = f"{dirname}/.coverage{fpart}"
os.makedirs(dirname)
covdata1 = CoverageData(basename=basename, suffix="1")
covdata1.add_lines(LINES_1)
covdata1.write()
covdata2 = CoverageData(basename=basename, suffix="2")
covdata2.add_lines(LINES_2)
covdata2.write()
covdata3 = CoverageData(basename=basename)
combine_parallel_data(covdata3)
assert_line_counts(covdata3, SUMMARY_1_2)
assert_measured_files(covdata3, MEASURED_FILES_1_2)
self.assert_file_count(glob.escape(basename) + ".*", 0)
def test_meta_data(self) -> None:
# The metadata written to the data file shouldn't interfere with
# hashing to remove duplicates, except for debug=process, which
# writes debugging info as metadata.
debug = DebugControlString(options=[])
covdata1 = CoverageData(basename="meta.1", debug=debug)
covdata1.add_lines(LINES_1)
covdata1.write()
with sqlite3.connect("meta.1") as con:
data = sorted(k for (k,) in con.execute("select key from meta"))
assert data == ["has_arcs", "version"]
debug = DebugControlString(options=["process"])
covdata2 = CoverageData(basename="meta.2", debug=debug)
covdata2.add_lines(LINES_1)
covdata2.write()
with sqlite3.connect("meta.2") as con:
data = sorted(k for (k,) in con.execute("select key from meta"))
assert data == ["has_arcs", "sys_argv", "version", "when"]
class DumpsLoadsTest(CoverageTest):
"""Tests of CoverageData.dumps and loads."""
run_in_temp_dir = False
@pytest.mark.parametrize("klass", [CoverageData, DebugCoverageData])
def test_serialization(self, klass: TCoverageData) -> None:
covdata1 = klass(no_disk=True)
covdata1.add_lines(LINES_1)
covdata1.add_lines(LINES_2)
serial = covdata1.dumps()
covdata2 = klass(no_disk=True)
covdata2.loads(serial)
assert_line_counts(covdata2, SUMMARY_1_2)
assert_measured_files(covdata2, MEASURED_FILES_1_2)
def test_misfed_serialization(self) -> None:
covdata = CoverageData(no_disk=True)
bad_data = b"Hello, world!\x07 " + b"z" * 100
msg = r"Unrecognized serialization: {} \(head of {} bytes\)".format(
re.escape(repr(bad_data[:40])),
len(bad_data),
)
with pytest.raises(DataError, match=msg):
covdata.loads(bad_data)
class NoDiskTest(CoverageTest):
"""Tests of in-memory CoverageData."""
run_in_temp_dir = False
def test_updating(self) -> None:
# https://github.com/nedbat/coveragepy/issues/1323
a = CoverageData(no_disk=True)
a.add_lines({"foo.py": [10, 20, 30]})
assert a.measured_files() == {"foo.py"}
b = CoverageData(no_disk=True)
b.update(a)
assert b.measured_files() == {"foo.py"}