| #!/usr/bin/env python3 |
| # SPDX-License-Identifier: Apache-2.0 |
| # ----------------------------------------------------------------------------- |
| # Copyright 2019-2024 Arm Limited |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| # use this file except in compliance with the License. You may obtain a copy |
| # of the License at: |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| # License for the specific language governing permissions and limitations |
| # under the License. |
| # ----------------------------------------------------------------------------- |
| """ |
| The image test runner is used for image quality and performance testing. |
| |
| It is designed to process directories of arbitrary test images, using the |
| directory structure and path naming conventions to self-describe how each image |
| is to be compressed. Some built-in test sets are provided in the ./Test/Images |
| directory, and others can be downloaded by running the astc_test_image_dl |
| script. |
| |
| Attributes: |
| RESULT_THRESHOLD_WARN: The result threshold (dB) for getting a WARN. |
| RESULT_THRESHOLD_FAIL: The result threshold (dB) for getting a FAIL. |
| TEST_BLOCK_SIZES: The block sizes we can test. This is a subset of the |
| block sizes supported by ASTC, simply to keep test run times |
| manageable. |
| """ |
| |
| import argparse |
| import os |
| import platform |
| import sys |
| |
| import testlib.encoder as te |
| import testlib.testset as tts |
| import testlib.resultset as trs |
| |
| # Require bit exact with reference scores |
| RESULT_THRESHOLD_WARN = -0.00 |
| RESULT_THRESHOLD_FAIL = -0.00 |
| RESULT_THRESHOLD_3D_FAIL = -0.00 |
| |
| |
| TEST_BLOCK_SIZES = ["4x4", "5x5", "6x6", "8x8", "12x12", "3x3x3", "6x6x6"] |
| |
| TEST_QUALITIES = ["fastest", "fast", "medium", "thorough", "verythorough", "exhaustive"] |
| |
| |
| def is_3d(blockSize): |
| """ |
| Is the given block size a 3D block type? |
| |
| Args: |
| blockSize (str): The block size. |
| |
| Returns: |
| bool: ``True`` if the block string is a 3D block size, ``False`` if 2D. |
| """ |
| return blockSize.count("x") == 2 |
| |
| |
| def count_test_set(testSet, blockSizes): |
| """ |
| Count the number of test executions needed for a test set. |
| |
| Args: |
| testSet (TestSet): The test set to run. |
| blockSizes (list(str)): The block sizes to run. |
| |
| Returns: |
| int: The number of test executions needed. |
| """ |
| count = 0 |
| for blkSz in blockSizes: |
| for image in testSet.tests: |
| # 3D block sizes require 3D images |
| if is_3d(blkSz) != image.is3D: |
| continue |
| |
| count += 1 |
| |
| return count |
| |
| |
| def determine_result(image, reference, result): |
| """ |
| Determine a test result against a reference and thresholds. |
| |
| Args: |
| image (TestImage): The image being compressed. |
| reference (Record): The reference result to compare against. |
| result (Record): The test result. |
| |
| Returns: |
| Result: The result code. |
| """ |
| dPSNR = result.psnr - reference.psnr |
| |
| if (dPSNR < RESULT_THRESHOLD_FAIL) and (not image.is3D): |
| return trs.Result.FAIL |
| |
| if (dPSNR < RESULT_THRESHOLD_3D_FAIL) and image.is3D: |
| return trs.Result.FAIL |
| |
| if dPSNR < RESULT_THRESHOLD_WARN: |
| return trs.Result.WARN |
| |
| return trs.Result.PASS |
| |
| |
| def format_solo_result(image, result): |
| """ |
| Format a metrics string for a single (no compare) result. |
| |
| Args: |
| image (TestImage): The image being tested. |
| result (Record): The test result. |
| |
| Returns: |
| str: The metrics string. |
| """ |
| name = "%5s %s" % (result.blkSz, result.name) |
| tPSNR = "%2.3f dB" % result.psnr |
| tTTime = "%.3f s" % result.tTime |
| tCTime = "%.3f s" % result.cTime |
| tCMTS = "%.3f MT/s" % result.cRate |
| |
| return "%-32s | %8s | %9s | %9s | %11s" % \ |
| (name, tPSNR, tTTime, tCTime, tCMTS) |
| |
| |
| def format_result(image, reference, result): |
| """ |
| Format a metrics string for a comparison result. |
| |
| Args: |
| image (TestImage): The image being tested. |
| reference (Record): The reference result to compare against. |
| result (Record): The test result. |
| |
| Returns: |
| str: The metrics string. |
| """ |
| dPSNR = result.psnr - reference.psnr |
| |
| try: |
| sTTime = reference.tTime / result.tTime |
| except ZeroDivisionError: |
| sTTime = float('NaN') |
| |
| try: |
| sCTime = reference.cTime / result.cTime |
| except ZeroDivisionError: |
| sCTime = float('NaN') |
| |
| name = "%5s %s" % (result.blkSz, result.name) |
| tPSNR = "%2.3f dB (% 1.3f dB)" % (result.psnr, dPSNR) |
| tTTime = "%.3f s (%1.2fx)" % (result.tTime, sTTime) |
| tCTime = "%.3f s (%1.2fx)" % (result.cTime, sCTime) |
| tCMTS = "%.3f MT/s" % (result.cRate) |
| result = determine_result(image, reference, result) |
| |
| return "%-32s | %22s | %15s | %15s | %11s | %s" % \ |
| (name, tPSNR, tTTime, tCTime, tCMTS, result.name) |
| |
| |
| def run_test_set(encoder, testRef, testSet, quality, blockSizes, testRuns, |
| keepOutput, threads): |
| """ |
| Execute all tests in the test set. |
| |
| Args: |
| encoder (EncoderBase): The encoder to use. |
| testRef (ResultSet): The test reference results. |
| testSet (TestSet): The test set. |
| quality (str): The quality level to execute the test against. |
| blockSizes (list(str)): The block sizes to execute each test against. |
| testRuns (int): The number of test repeats to run for each image test. |
| keepOutput (bool): Should the test preserve output images? This is |
| only a hint and discarding output may be ignored if the encoder |
| version used can't do it natively. |
| threads (int or None): The thread count to use. |
| |
| Returns: |
| ResultSet: The test results. |
| """ |
| resultSet = trs.ResultSet(testSet.name) |
| |
| curCount = 0 |
| maxCount = count_test_set(testSet, blockSizes) |
| |
| dat = (testSet.name, encoder.name, quality) |
| title = "Test Set: %s / Encoder: %s -%s" % dat |
| print(title) |
| print("=" * len(title)) |
| |
| for blkSz in blockSizes: |
| for image in testSet.tests: |
| # 3D block sizes require 3D images |
| if is_3d(blkSz) != image.is3D: |
| continue |
| |
| curCount += 1 |
| |
| dat = (curCount, maxCount, blkSz, image.testFile) |
| print("Running %u/%u %s %s ... " % dat, end='', flush=True) |
| res = encoder.run_test(image, blkSz, "-%s" % quality, testRuns, |
| keepOutput, threads) |
| res = trs.Record(blkSz, image.testFile, res[0], res[1], res[2], res[3]) |
| resultSet.add_record(res) |
| |
| if testRef: |
| refResult = testRef.get_matching_record(res) |
| res.set_status(determine_result(image, refResult, res)) |
| |
| try: |
| res.tTimeRel = refResult.tTime / res.tTime |
| except ZeroDivisionError: |
| res.tTimeRel = float('NaN') |
| |
| try: |
| res.cTimeRel = refResult.cTime / res.cTime |
| except ZeroDivisionError: |
| res.cTimeRel = float('NaN') |
| |
| res.psnrRel = res.psnr - refResult.psnr |
| |
| res = format_result(image, refResult, res) |
| else: |
| res = format_solo_result(image, res) |
| |
| print("\r[%3u] %s" % (curCount, res)) |
| |
| return resultSet |
| |
| |
| def get_encoder_params(encoderName, referenceName, imageSet): |
| """ |
| The the encoder and image set parameters for a test run. |
| |
| Args: |
| encoderName (str): The encoder name. |
| referenceName (str): The reference encoder name. |
| imageSet (str): The test image set. |
| |
| Returns: |
| tuple(EncoderBase, str, str, str): The test parameters for the |
| requested encoder and test set. An instance of the encoder wrapper |
| class, the output data name, the output result directory, and the |
| reference to use. |
| """ |
| # 1.7 variants |
| if encoderName == "ref-1.7": |
| encoder = te.Encoder1_7() |
| name = "reference-1.7" |
| outDir = "Test/Images/%s" % imageSet |
| refName = None |
| return (encoder, name, outDir, refName) |
| |
| if encoderName.startswith("ref"): |
| _, version, simd = encoderName.split("-") |
| |
| # 2.x, 3.x, and 4.x variants |
| compatible2xPrefixes = ["2.", "3.", "4.", "5."] |
| if any(True for x in compatible2xPrefixes if version.startswith(x)): |
| encoder = te.Encoder2xRel(version, simd) |
| name = f"reference-{version}-{simd}" |
| outDir = "Test/Images/%s" % imageSet |
| refName = None |
| return (encoder, name, outDir, refName) |
| |
| # Latest main |
| if version == "main": |
| encoder = te.Encoder2x(simd) |
| name = f"reference-{version}-{simd}" |
| outDir = "Test/Images/%s" % imageSet |
| refName = None |
| return (encoder, name, outDir, refName) |
| |
| assert False, f"Encoder {encoderName} not recognized" |
| |
| encoder = te.Encoder2x(encoderName) |
| name = "develop-%s" % encoderName |
| outDir = "TestOutput/%s" % imageSet |
| refName = referenceName.replace("ref", "reference") |
| return (encoder, name, outDir, refName) |
| |
| |
| def parse_command_line(): |
| """ |
| Parse the command line. |
| |
| Returns: |
| Namespace: The parsed command line container. |
| """ |
| parser = argparse.ArgumentParser() |
| |
| # All reference encoders |
| refcoders = ["ref-1.7", |
| "ref-2.5-neon", "ref-2.5-sse2", "ref-2.5-sse4.1", "ref-2.5-avx2", |
| "ref-3.7-neon", "ref-3.7-sse2", "ref-3.7-sse4.1", "ref-3.7-avx2", |
| "ref-4.8-neon", "ref-4.8-sse2", "ref-4.8-sse4.1", "ref-4.8-avx2", |
| "ref-5.0-neon", "ref-5.0-sse2", "ref-5.0-sse4.1", "ref-5.0-avx2", |
| "ref-main-neon", "ref-main-sve_256", "ref-main-sve_128", "ref-main-sse2", "ref-main-sse4.1", "ref-main-avx2"] |
| |
| # All test encoders |
| testcoders = ["none", "neon", "sve_256", "sve_128", "sse2", "sse4.1", "avx2", "native", "universal"] |
| testcodersAArch64 = ["neon", "sve_256", "sve_128"] |
| testcodersX86 = ["sse2", "sse4.1", "avx2"] |
| |
| coders = refcoders + testcoders + ["all-aarch64", "all-x86"] |
| |
| parser.add_argument("--encoder", dest="encoders", default="avx2", |
| choices=coders, help="test encoder variant") |
| |
| parser.add_argument("--reference", dest="reference", default="ref-main-avx2", |
| choices=refcoders, help="reference encoder variant") |
| |
| astcProfile = ["ldr", "ldrs", "hdr", "all"] |
| parser.add_argument("--color-profile", dest="profiles", default="all", |
| choices=astcProfile, help="test color profile") |
| |
| imgFormat = ["l", "xy", "rgb", "rgba", "all"] |
| parser.add_argument("--color-format", dest="formats", default="all", |
| choices=imgFormat, help="test color format") |
| |
| choices = list(TEST_BLOCK_SIZES) + ["all"] |
| parser.add_argument("--block-size", dest="blockSizes", |
| action="append", choices=choices, |
| help="test block size") |
| |
| testDir = os.path.dirname(__file__) |
| testDir = os.path.join(testDir, "Images") |
| testSets = [] |
| for path in os.listdir(testDir): |
| fqPath = os.path.join(testDir, path) |
| if os.path.isdir(fqPath): |
| testSets.append(path) |
| testSets.append("all") |
| |
| parser.add_argument("--test-set", dest="testSets", default="Small", |
| choices=testSets, help="test image test set") |
| |
| parser.add_argument("--test-image", dest="testImage", default=None, |
| help="select a specific test image from the test set") |
| |
| choices = list(TEST_QUALITIES) + ["all", "all+"] |
| parser.add_argument("--test-quality", dest="testQual", default="thorough", |
| choices=choices, help="select a specific test quality") |
| |
| parser.add_argument("--repeats", dest="testRepeats", default=1, |
| type=int, help="test iteration count") |
| |
| parser.add_argument("--keep-output", dest="keepOutput", default=False, |
| action="store_true", help="keep image output") |
| |
| parser.add_argument("-j", dest="threads", default=None, |
| type=int, help="thread count") |
| |
| |
| args = parser.parse_args() |
| |
| # Turn things into canonical format lists |
| if args.encoders == "all-aarch64": |
| args.encoders = testcodersAArch64 |
| elif args.encoders == "all-x86": |
| args.encoders = testcodersX86 |
| else: |
| args.encoders = [args.encoders] |
| |
| if args.testQual == "all+": |
| args.testQual = TEST_QUALITIES |
| elif args.testQual == "all": |
| args.testQual = TEST_QUALITIES |
| args.testQual.remove("verythorough") |
| args.testQual.remove("exhaustive") |
| else: |
| args.testQual = [args.testQual] |
| |
| if not args.blockSizes or ("all" in args.blockSizes): |
| args.blockSizes = TEST_BLOCK_SIZES |
| |
| args.testSets = testSets[:-1] if args.testSets == "all" \ |
| else [args.testSets] |
| |
| args.profiles = astcProfile[:-1] if args.profiles == "all" \ |
| else [args.profiles] |
| |
| args.formats = imgFormat[:-1] if args.formats == "all" \ |
| else [args.formats] |
| |
| return args |
| |
| |
| def main(): |
| """ |
| The main function. |
| |
| Returns: |
| int: The process return code. |
| """ |
| # Parse command lines |
| args = parse_command_line() |
| |
| testSetCount = 0 |
| worstResult = trs.Result.NOTRUN |
| |
| for quality in args.testQual: |
| for imageSet in args.testSets: |
| for encoderName in args.encoders: |
| (encoder, name, outDir, refName) = \ |
| get_encoder_params(encoderName, args.reference, imageSet) |
| |
| testDir = "Test/Images/%s" % imageSet |
| testRes = "%s/astc_%s_%s_results.csv" % (outDir, name, quality) |
| |
| testRef = None |
| if refName: |
| dat = (testDir, refName, quality) |
| testRefPath = "%s/astc_%s_%s_results.csv" % dat |
| testRef = trs.ResultSet(imageSet) |
| testRef.load_from_file(testRefPath) |
| |
| testSetCount += 1 |
| testSet = tts.TestSet(imageSet, testDir, |
| args.profiles, args.formats, args.testImage) |
| |
| resultSet = run_test_set(encoder, testRef, testSet, quality, |
| args.blockSizes, args.testRepeats, |
| args.keepOutput, args.threads) |
| |
| resultSet.save_to_file(testRes) |
| |
| if refName: |
| summary = resultSet.get_results_summary() |
| worstResult = max(summary.get_worst_result(), worstResult) |
| print(summary) |
| |
| if (testSetCount > 1) and (worstResult != trs.Result.NOTRUN): |
| print("OVERALL STATUS: %s" % worstResult.name) |
| |
| if worstResult == trs.Result.FAIL: |
| return 1 |
| |
| return 0 |
| |
| |
| if __name__ == "__main__": |
| sys.exit(main()) |
| |
| |
| if __name__ == "__main__": |
| sys.exit(main()) |