blob: 23ee60371c562d4ee824fe6d764c7031b0dac710 [file] [log] [blame]
# Copyright 2018 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import logging
import math
import os
import queue
import subprocess
from autotest_lib.client.bin import test
from autotest_lib.client.common_lib import error
from autotest_lib.client.common_lib import file_utils
from autotest_lib.client.cros import upstart
import archiver
import configurator
import helpers
import fake_printer
import log_reader
import multithreaded_processor
from six.moves import range
# Timeout for printing documents in seconds
_FAKE_PRINTER_TIMEOUT = 200
# Prefix for CUPS printer name
_FAKE_PRINTER_ID = 'FakePrinter'
# First port number to use, this test uses consecutive ports numbers.
_FIRST_PORT_NUMBER = 9100
# Command to run lpadmin as the lpadmin user.
_LPADMIN_COMMAND = ['sudo', '-u', 'lpadmin', '/usr/sbin/lpadmin']
class platform_PrinterPpds(test.test):
"""
This test gets a list of PPD files and a list of test documents. It tries
to add printer using each PPD file and to print all test documents on
every printer created this way. Becasue the number of PPD files to test can
be large (more then 3K), PPD files are tested simultaneously in many
threads.
"""
version = 3
def _get_filenames_from_PPD_indexes(self):
"""
It returns all PPD filenames from SCS server.
@returns a list of PPD filenames without duplicates
"""
# extracts PPD filenames from all 20 index files (in parallel)
outputs = self._processor.run(helpers.get_filenames_from_PPD_index, 20)
# joins obtained lists and performs deduplication
ppd_files = set()
for output in outputs:
ppd_files.update(output)
return list(ppd_files)
def _calculate_full_path(self, path):
"""
Converts path given as a parameter to absolute path.
@param path: a path set in configuration (relative, absolute or None)
@returns absolute path or None if the input parameter was None
"""
if path is None or os.path.isabs(path):
return path
path_current = os.path.dirname(os.path.realpath(__file__))
return os.path.join(path_current, path)
def _add_cups_printer(self, printer_id, uri, ppd_content):
"""
Adds a CUPS printer manually.
@param printer_id: The ID to assign to the new CUPS printer.
@param uri: The URI of the printer (e.g., 'socket://127.0.0.1:9100').
@param ppd_content: The content of the PPD file as bytes.
@raises Exception if the lpadmin command fails.
"""
cmd = _LPADMIN_COMMAND + ['-v', uri, '-p', printer_id, '-P', '-', '-E']
process = subprocess.Popen(cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
stdout, stderr = process.communicate(input=ppd_content)
if process.returncode != 0:
raise Exception('Failed to add printer {}: {} {}'.format(
printer_id, stdout.decode(), stderr.decode()))
def _remove_cups_printer(self, printer_id):
"""
Removes a CUPS printer.
@param printer_id: The ID of the CUPS printer to remove.
@raises Exception if the lpadmin command fails.
"""
cmd = _LPADMIN_COMMAND + ['-x', printer_id]
process = subprocess.Popen(cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
stdout, stderr = process.communicate()
if process.returncode != 0:
raise Exception('Failed to remove printer {}: {} {}'.format(
printer_id, stdout.decode(), stderr.decode()))
def initialize(self,
path_docs,
path_ppds,
path_digests=None,
threads_count=8):
"""
@param path_docs: path to local directory with documents to print
@param path_ppds: path to local directory with PPD files to test;
the directory is supposed to be compressed as .tar.xz.
@param path_digests: path to local directory with digests files for
test documents; if None is set then content of printed
documents is not verified
@param threads_count: number of threads to use
"""
# Calculates absolute paths for all parameters
self._location_of_test_docs = self._calculate_full_path(path_docs)
self._location_of_PPD_files = self._calculate_full_path(path_ppds)
location_of_digests_files = self._calculate_full_path(path_digests)
# This object is used for running tasks in many threads simultaneously
self._processor = multithreaded_processor.MultithreadedProcessor(
threads_count)
# This object is responsible for parsing CUPS logs
self._log_reader = log_reader.LogReader()
# This object is responsible for the system configuration
self._configurator = configurator.Configurator()
# Read list of test documents
self._docs = helpers.list_entries_from_directory(
path=self._location_of_test_docs,
with_suffixes=('.pdf'),
nonempty_results=True,
include_directories=False)
# Load the list of PPD files to omit
do_not_test_path = self._calculate_full_path('do_not_test.txt')
do_not_test_set = set(helpers.load_lines_from_file(do_not_test_path))
# Unpack an archive with the PPD files:
path_archive = self._location_of_PPD_files + '.tar.xz'
path_target_dir = self._calculate_full_path('.')
file_utils.rm_dir_if_exists(os.path.join(path_target_dir, path_ppds))
subprocess.call(['tar', 'xJf', path_archive, '-C', path_target_dir])
# Load PPD files from the unpacked directory
self._ppds = helpers.list_entries_from_directory(
path=self._location_of_PPD_files,
with_suffixes=('.ppd', '.ppd.gz'),
nonempty_results=True,
include_directories=False)
# Remove from the list all PPD files to omit and sort it
self._ppds = list(set(self._ppds) - do_not_test_set)
self._ppds.sort()
# Load digests files
self._digests = dict()
self._sizes = dict()
if location_of_digests_files is None:
for doc_name in self._docs:
self._digests[doc_name] = dict()
self._sizes[doc_name] = dict()
else:
path_denylist = os.path.join(location_of_digests_files,
'denylist.txt')
denylist = helpers.load_lines_from_file(path_denylist)
for doc_name in self._docs:
digests_name = doc_name + '.digests'
path = os.path.join(location_of_digests_files, digests_name)
digests, sizes = helpers.parse_digests_file(path, denylist)
self._digests[doc_name] = digests
self._sizes[doc_name] = sizes
# Prepare the container with numbers of ports to use
self._ports_for_printers = queue.Queue()
# We do not want to reuse the same port number after immediately after
# closing the socket, so we reserve by average 2 ports number per one
# fake printer.
for i in range(threads_count * 2):
self._ports_for_printers.put(_FIRST_PORT_NUMBER + i)
def cleanup(self):
"""
Cleanup.
"""
# Resore previous system settings
self._configurator.restore()
# Delete directories with PPD files
path_ppds = self._calculate_full_path('ppds_100')
file_utils.rm_dir_if_exists(path_ppds)
path_ppds = self._calculate_full_path('ppds_all')
file_utils.rm_dir_if_exists(path_ppds)
def run_once(self, path_outputs=None):
"""
This is the main test function. It runs the testing procedure for
every PPD file. Tests are run simultaneously in many threads.
@param path_outputs: if it is not None, raw outputs sent
to printers are dumped here; the directory is overwritten if
already exists (is deleted and recreated)
@raises error.TestFail if at least one of the tests failed
"""
# Set directory for output documents
self._path_output_directory = self._calculate_full_path(path_outputs)
if self._path_output_directory is not None:
# Delete whole directory if already exists
file_utils.rm_dir_if_exists(self._path_output_directory)
# Create archivers
self._archivers = dict()
for doc_name in self._docs:
path_for_archiver = os.path.join(self._path_output_directory,
doc_name)
self._archivers[doc_name] = archiver.Archiver(path_for_archiver,
self._ppds, 50)
# A place for new digests
self._new_digests = dict()
self._new_sizes = dict()
for doc_name in self._docs:
self._new_digests[doc_name] = dict()
self._new_sizes[doc_name] = dict()
# We want to talk directly to cupsd without going through the UI. Stop
# the UI to eliminate sources of flakiness.
try:
upstart.stop_job('ui')
except error.CmdError as e:
logging.warning("Failed to stop ui: %s", e)
upstart.restart_job('cupsd')
# Runs tests for all PPD files (in parallel)
outputs = self._processor.run(self._thread_test_PPD, len(self._ppds))
# Analyses tests' outputs, prints a summary report and builds a list
# of PPD filenames that failed
failures = []
for i, output in enumerate(outputs):
ppd_file = self._ppds[i]
if output != True:
failures.append(ppd_file)
else:
output = 'OK'
line = "%s: %s" % (ppd_file, output)
logging.info(line)
# Calculate digests files for output documents (if dumped)
if self._path_output_directory is not None:
for doc_name in self._docs:
path = os.path.join(self._path_output_directory,
doc_name + '.digests')
helpers.save_digests_file(path, self._new_digests[doc_name],
self._new_sizes[doc_name], failures)
# Restore the ui job.
upstart.restart_job('ui')
# Raises an exception if at least one test failed
if len(failures) > 0:
failures.sort()
raise error.TestFail(
'Test failed for %d PPD files: %s'
% (len(failures), ', '.join(failures)) )
def _thread_test_PPD(self, task_id):
"""
Runs a test procedure for single PPD file.
It retrieves assigned PPD file and run for it a test procedure.
@param task_id: an index of the PPD file in self._ppds
@returns True when the test was passed or description of the error
(string) if the test failed
"""
# Gets content of the PPD file
try:
ppd_file = self._ppds[task_id]
if self._location_of_PPD_files is None:
# Downloads PPD file from the SCS server
ppd_content = helpers.download_PPD_file(ppd_file)
else:
# Reads PPD file from local filesystem
path_ppd = os.path.join(self._location_of_PPD_files, ppd_file)
with open(path_ppd, 'rb') as ppd_file_descriptor:
ppd_content = ppd_file_descriptor.read()
except BaseException as e:
return 'MISSING PPD: ' + str(e)
# Choose the port for the printer
port = self._ports_for_printers.get()
# Runs the test procedure
try:
self._PPD_test_procedure(ppd_file, ppd_content, port)
except BaseException as e:
return 'FAIL: ' + str(e)
finally:
self._ports_for_printers.put(port)
return True
def _PPD_test_procedure(self, ppd_name, ppd_content, port):
"""
Test procedure for single PPD file.
It tries to run the following steps:
1. Starts an instance of FakePrinter
2. Configures CUPS printer
3. For each test document run the following steps:
3a. Sends tests documents to the CUPS printer
3b. Fetches the raw document from the FakePrinter
3c. Parse CUPS logs and check for any errors
3d. If self._path_output_directory is set, save the raw document
and all intermediate steps in the provided directory
3e. If the size is available, verify the size of the
output document. *NOTE*: This test validated digests
in the past but it no longer does so. See b/429209202.
4. Removes CUPS printer and stops FakePrinter
If the test fails this method throws an exception.
@param ppd_name: a name of the PPD file
@param ppd_content: a content of the PPD file
@param port: a port for the printer
@throws Exception when the test fails
"""
# Create work directory for external pipelines and save the PPD file
# there (if needed)
path_ppd = None
try:
# Starts the fake printer
with fake_printer.FakePrinter(port) as printer:
# Add a CUPS printer manually with given ppd file
cups_printer_id = '%s_at_%05d' % (_FAKE_PRINTER_ID,port)
self._add_cups_printer(cups_printer_id,
'socket://127.0.0.1:%d' % port,
ppd_content)
# Prints all test documents
try:
for doc_name in self._docs:
# Full path to the test document
path_doc = os.path.join(
self._location_of_test_docs, doc_name)
# Sends test document to printer
argv = ['lp', '-d', cups_printer_id]
# This is a workaround for a bug b/292551154.
argv += ['-o', 'print-color-mode=rgb']
argv += [path_doc]
subprocess.call(argv)
# Prepare a workdir for the pipeline (if needed)
path_pipeline_workdir_temp = None
# Gets the output document from the fake printer
doc = printer.fetch_document(_FAKE_PRINTER_TIMEOUT)
digest = helpers.calculate_digest(doc)
# Retrive data from the log file
no_errors, logs, pipeline = \
self._log_reader.extract_result(
cups_printer_id, path_ppd, path_doc,
path_pipeline_workdir_temp)
# Archive obtained results in the output directory
if self._path_output_directory is not None:
self._archivers[doc_name].save_file(
ppd_name, '.out', doc, apply_gzip=True)
self._archivers[doc_name].save_file(
ppd_name, '.log', logs.encode())
if pipeline is not None:
self._archivers[doc_name].save_file(
ppd_name, '.sh', pipeline.encode())
# Set new digest
self._new_digests[doc_name][ppd_name] = digest
self._new_sizes[doc_name][ppd_name] = len(doc)
# Fail if any of CUPS filters failed
if not no_errors:
raise Exception('One of the CUPS filters failed')
# Check document's size (if known)
if ppd_name in self._sizes[doc_name]:
# Get the min/max tolerated PPD output size
minimum_size = \
math.floor(min(self._sizes[doc_name].values())*0.9)
maximum_size = \
math.ceil(max(self._sizes[doc_name].values())*1.2)
new_size = len(doc)
if minimum_size > new_size or new_size > maximum_size:
old_size = str(self._sizes[doc_name][ppd_name])
message = 'Document\'s size changed too much'
message += ', old size: ' + str(old_size)
message += ', new size: ' + str(new_size)
raise Exception(message)
else:
# Simple validation
if len(doc) < 16:
raise Exception('Empty output')
finally:
# Remove CUPS printer
self._remove_cups_printer(cups_printer_id)
# The fake printer is stopped at the end of "with" statement
finally:
# Finalize archivers and cleaning
if self._path_output_directory is not None:
for doc_name in self._docs:
self._archivers[doc_name].finalize_prefix(ppd_name)