blob: bec57c16c21246ac4b3349f66287ba4b6f5aa283 [file] [log] [blame] [edit]
# -*- coding: utf-8 -*-
# Copyright 2019 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""A subcommand to try the file filter locally with a test result directory.
This subcommand prints a summary of the total file count and size of
before/after applying the filter.
Optionally, it prints out the filtered file list.
"""
from __future__ import print_function
import argparse
import asyncio
import logging
import pathlib
import processing_step
import result_file_filter
import test_result_registry
SUB_COMMAND_NAME = "try_filter"
_LOGGER = logging.getLogger("moblab_uploader")
_RC_NO_MATCHED_CLASS = 1
_PRINT_FILE_LIST_LIMIT = 100
def _directory_type(path):
"""Check `path` wrap it with a pathlib.Path object.
We convert input `path` to a abs path in order to help following steps.
"""
path = pathlib.Path(path)
# Convert relative path to abs path if needed. Don't use
# pathlib.Path.resolve here because it also follows symlink which may
# confuse selecting proper test result class.
if not path.is_absolute():
path = pathlib.Path(path).resolve()
if path.is_dir():
return path
raise argparse.ArgumentTypeError("Invalid directory name: {}".format(path))
def _get_file_size(result_file):
"""Get the size of a file."""
return result_file.abs_path.lstat().st_size
def _create_test_result_obj(abs_path, registry=None):
"""Create test result object for `path` according to test result registry.
Args:
abs_path: A pathlib.Path object with abs path name.
registry: A dict has all test result class registered.
"""
for registry_entry in registry or test_result_registry.REGISTRY:
root_dir = registry_entry.root_dir
for parent in abs_path.parents:
if parent == root_dir:
break
else:
continue
for cls in registry_entry.classes:
if abs_path.match(str(root_dir / cls.GLOB_PATTERN)):
return cls(
root_dir=root_dir,
relative_path=abs_path.relative_to(root_dir),
)
class _Counter(processing_step.ProcessStep):
"""A counter coroutine which count input."""
def __init__(self, *, method=None, next_step=None):
"""Constructor.
Args:
method: A func which count input and return an integer to be added
to total sum. Default is just return 1.
next_step: The following coroutine in the whole processing pipeline.
"""
super().__init__(next_step=next_step)
self._sum = 0
self._method = method or (lambda _: 1)
@property
def sum(self):
"""Return the count result."""
return self._sum
def reset(self):
"""Reset the counter."""
self._sum = 0
async def count(self, data):
"""A coroutine to count input and sum the result."""
self._sum += self._method(data)
if self._next_step:
await self._next_step(data)
class _FileNamePrinter(processing_step.ProcessStep):
"""A configurable class to print input file name."""
def __init__(self, *, next_step=None, limit=0):
super().__init__(next_step=next_step)
self._limit = limit
self._count = 0
_LOGGER.info(
"The filtered files will be listed, but up to %s files.", limit
)
async def print(self, result_file):
if self._limit:
self._count += 1
if self._count > self._limit:
return
print(result_file.relative_path)
def add_arguments(parent_parser):
"""Add arguments for this subcommand.
Args:
parent_parser: The argparse.ArgumentParser object to add arguments.
"""
parser = parent_parser.add_parser(
SUB_COMMAND_NAME,
help="Try the file filter locally with a real test result directory "
"and see the filtering result.",
)
parser.add_argument(
"-d",
"--dir",
type=_directory_type,
required=True,
help="The directory to test the file filter.",
)
parser.add_argument(
"-l",
"--list-result-files",
action="store_true",
help="Print out all the files after applying the filter.",
)
parser.set_defaults(func=try_file_filter)
def try_file_filter(args):
test_result = _create_test_result_obj(args.dir)
if not test_result:
_LOGGER.error(
"No registered class matches the directory: %s", args.dir
)
return _RC_NO_MATCHED_CLASS
files_generator = result_file_filter.ResultFileFilter(
filter_config=args.uploading_filter_config, force=True
)
file_counter = _Counter()
file_size_counter = _Counter(method=_get_file_size)
# The process steps:
# generate file list -> optionally filter them -> count file
# -> count file size -> optionally print the file name.
files_generator.next_step = file_counter.count
file_counter.next_step = file_size_counter.count
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(files_generator.list_all_files(test_result))
total_file_count = file_counter.sum
total_file_size = file_size_counter.sum
# Reset counters for counting filtered files.
file_counter.reset()
file_size_counter.reset()
if args.list_result_files:
file_size_counter.next_step = _FileNamePrinter(
limit=_PRINT_FILE_LIST_LIMIT
).print
loop.run_until_complete(files_generator.filter_files(test_result))
finally:
loop.close()
filtered_file_count = file_counter.sum
filtered_file_size = file_size_counter.sum
print(
"Filtering result: {} files: {} bytes => {} files: {} bytes. {:.0%} "
"space saved!".format(
total_file_count,
total_file_size,
filtered_file_count,
filtered_file_size,
1 - float(filtered_file_size) / total_file_size,
)
)
return 0