blob: cf821fb305ec1b769868f95d97390671a3168d86 [file] [log] [blame]
# Copyright 2015 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Implementation of ScreenCalibration."""
import logging
import sys
from matplotlib import pyplot
from safetynet import Tuple, TypecheckMeta
import cv2
import numpy as np
import skimage.morphology as morphology
import skimage.transform as transform
import skimage.filters as filters
from optofidelity.util import nputil
from optofidelity.videoproc import DebugView, Filter, Shape
_log = logging.getLogger(__name__)
class ScreenCalibration(object):
__metaclass__ = TypecheckMeta
ENABLE_STABILIZATION = True
"""Enable image stabilization."""
SHAPE_MARGIN = 12
"""The test area is reduced in size by this margin to account for potential
camera shake."""
DIFF_THRESHOLD = 0.4
"""Threshold used on the difference between the on and off image to find
the test area."""
CLOSING_KERNEL = morphology.disk(8)
"""Size of the disk shaped kernel used for the closing operation to close
any holes in the detected shapes that might be caused by noise."""
MIN_AREA = 5000
"""Minimum area of the screen in square pixels."""
MIN_SOLIDITY = 0.9
"""Minimum solidity of the test_area."""
RECTIFICATION_SCALING = 1.2
"""Factor by which the rectified image will be enlarged to make sure we are
not losing any information due to interpolation."""
OUT_OF_VIEW_EDGE_PIXELS = 10
"""How many pixels of the test_area have to be at the edge of the screen
to consider it being out of view?"""
WHITE_LEVEL_MIN = 0.8
"""Minimum level of a the white reference image."""
WHITE_LEVEL_MAX = 0.95
"""Maximum level of a the white reference image."""
PWM_COMPENSATION_LEVEL = 0.01
"""Compensate for PWM if the range of colors in the white reference image is
larger than this value."""
def __new__(cls, black_frame, white_frame, pwm_pulse=None):
"""Creates new screen calibration from black and white video frames.
If the white_frame is all white, the calibration will not do any
calibration or transformation but return the original images.
:param np.ndarray black_frame: Video frame of screen being black
:param np.ndarray white_frame: Video frame of screen being white
"""
self = super(ScreenCalibration, cls).__new__(cls)
self.is_identity = np.allclose(white_frame, 1.0)
self.pwm_profile = None
self.pwm_pulse = None
self.pwm_pulse_length = None
if pwm_pulse is not None:
self.pwm_pulse = pwm_pulse
self.pwm_profile = np.tile(pwm_pulse, 10)
self.pwm_pulse_length = len(pwm_pulse)
if self.is_identity:
self.shape = Shape(np.ones(white_frame.shape, dtype=np.bool))
self._rectification_transform = None
else:
self.shape = self._CalculateShape(black_frame, white_frame)
self._rectification_transform = self._EstimateRectTransform(self.shape)
self.black_reference = self.CameraToScreenSpace(black_frame)
self.white_reference = self.CameraToScreenSpace(white_frame)
self.black_frame = black_frame
self.white_frame = white_frame
# Detect features for tracking
image_width = self.black_frame.shape[1]
self._feature_range = (0, image_width)
self._good_features = None
if not self.is_identity:
space_left = self.shape.left
space_right = image_width - self.shape.right
if space_left > space_right:
# Trim to show only space left of the screen
self._feature_range = (0, self.shape.left - 100)
else:
# Trip to show only space right of the screen
self._feature_range = (self.shape.right + 200, image_width)
features_image = self._GetFeaturesImage(self.black_frame)
self._good_features = cv2.goodFeaturesToTrack(features_image, 200, 0.01,
30)
return self
@property
def camera_space_shape(self):
""":returns Tuple[int, int]: Array shape of camera space frames."""
return self.black_frame.shape
@property
def screen_space_shape(self):
""":returns Tuple[int, int]: Array shape of screen space frames."""
return self.black_reference.shape
@classmethod
def CreateIdentity(cls, frame_shape):
return cls(np.zeros(frame_shape), np.ones(frame_shape))
@classmethod
def FromScreenFlashVideo(cls, video_reader, debug=False):
"""Create screen calibration from a video showing a flashing screen.
:param VideoReader video_reader
:param bool debug
:returns ScreenCalibration
"""
with video_reader.PrefetchEnabled():
# Skim through video to find rough location of the screen.
prev_frame = None
delta_accum = np.zeros(video_reader.frame_shape)
for i, frame in video_reader.Frames(step=10):
if prev_frame is None:
prev_frame = frame
continue
delta = np.abs(frame - prev_frame)
delta_accum += delta
shapes = Shape.Shapes(delta_accum > 0.9)
screen_shape = max(shapes, key=lambda s: s.area)
# Walk frame-by-frame to find reference image for black and white screen.
ref_indices = []
frames = video_reader.Frames()
i, prev_frame = frames.next()
change_direction = 0
change_duration = 0
for i, frame in frames:
# Calculate the mid-range value of the inter-frame difference.
# This reliably describes the general direction of change.
diff = (frame - prev_frame) * screen_shape.mask
midrange = Filter.StableMidRange(diff)
_log.debug("%4d: midrange=%.2f, dir=%d, dur=%d", i, midrange,
change_direction, change_duration)
if change_direction == 0:
# Once the screen is clearly changing in one direction, make note
# of the direction.
if np.abs(midrange) > 0.05:
change_direction = np.sign(midrange)
change_duration = 1
else:
change_duration += 1
if np.sign(midrange) != change_direction:
# Wait until the screen is flipping it's change direction, this is
# a reference frame we want to pick.
if change_duration > 3:
ref_indices.append(i - 1)
if len(ref_indices) >= 2:
break
change_direction = 0
change_duration = 0
prev_frame = frame
if len(ref_indices) < 2:
raise Exception("Cannot find flashing screen")
# Identify black and white screen
black_ref = video_reader.FrameAt(ref_indices[0])
white_ref = video_reader.FrameAt(ref_indices[1])
white_index = ref_indices[1]
if np.mean(black_ref) > np.mean(white_ref):
(black_ref, white_ref) = (white_ref, black_ref)
white_index = ref_indices[0]
# Collect frames for PWM calibration
(height, width) = video_reader.frame_shape
pwm_frames = np.zeros((height, width, 6), dtype=np.float)
for i in range(6):
pwm_frames[:,:,i] = video_reader.FrameAt(white_index + i)
white_ref = np.max(pwm_frames, axis=2)
calibration = cls(black_ref, white_ref)
calibration.CalibratePWM(pwm_frames)
return calibration
def CalibratePWM(self, pwm_frames, debug=False):
def GetPWMProfile(pwm_frames, index):
"""Returns deviation from white reference as a profile."""
screen_space_pwm = self.CameraToScreenSpace(pwm_frames[:, :, index])
delta = self.white_reference - screen_space_pwm
profile = np.mean(delta, 0)
return profile
def ListPWMPulseResponses(pwm_frames):
"""Yields snippets of PWM pulses detected in the pwm_frames."""
for i in range(0, pwm_frames.shape[2]):
profile = GetPWMProfile(pwm_frames, i)
# No PWM artifacts? Great!
if np.max(profile) - np.min(profile) < 0.2:
return
# The PWM will show up as pulses from 0 to 1, when the profile is
# normalized. The state changes will show us when
normalized = profile / np.max(profile)
for start, end in nputil.FindPeaks(normalized, min_value=0.9,
max_slope=0.1, max_mid_range=0.1):
if start < 10:
continue
start = start - 10
end = np.min((end + 10, len(profile)))
pulse = profile[start:end]
yield pulse
def EstimateAveragePulseResponse(pulse_list):
"""Aligns all pulse responses and returns the averaged pulse response."""
max_pulse_length = np.max([len(p) for p in pulse_list])
pulse0 = np.copy(pulse_list[0])
pulse0.resize(max_pulse_length)
# Align all pulses to first pulse
aligned_pulses = []
for pulse in pulse_list:
shift = nputil.EstimateShift(pulse0, pulse, "mae", debug=debug)
matched = nputil.AlignArrays(pulse0, pulse, shift)
aligned_pulses.append(matched)
# Slightly overestimate. Overcorrection is less of a problem than under
# correction.
estimate = np.max(np.asarray(aligned_pulses), axis=0)
estimate = estimate * 1.1
# Force the beginning and end of the response to 0
base_level = np.max((estimate[:2], estimate[-2:]))
estimate = estimate - base_level
estimate[estimate < 0] = 0
if debug:
pyplot.figure()
for pulse in aligned_pulses:
pyplot.plot(pulse)
pyplot.plot(estimate, "o")
pyplot.show()
return pulse
def EstimatePulseLength(pulse, pwm_frames):
"""Estimates the frequency of PWM pulses through cross-correlation."""
lengths = []
for i in range(0, pwm_frames.shape[2]):
profile = GetPWMProfile(pwm_frames, i)
# Correlation shows peaks at the frequency at which pulse repeats in
# profile.
corr = np.correlate(pulse, profile, "full")
corr = corr / np.max(corr)
# Find strong maximums
maximums = nputil.FindLocalExtremes(corr)
filtered_maximums = filter(lambda e: corr[e] > 0.5, maximums)
sorted_maximums = sorted(filtered_maximums)
# Distance between maximums is the pulse length
lengths.extend(np.diff(sorted_maximums))
if debug:
pyplot.figure()
pyplot.subplot(1, 2, 0)
pyplot.plot(profile)
pyplot.plot(pulse)
pyplot.subplot(1, 2, 1)
pyplot.plot(corr)
pyplot.plot(sorted_maximums, [corr[e] for e in sorted_maximums], "o")
if debug:
pyplot.show()
return int(np.mean(lengths))
pulse_list = list(ListPWMPulseResponses(pwm_frames))
if len(pulse_list) < 4:
_log.info("No PWM artifact compensation needed.")
return
pulse = EstimateAveragePulseResponse(pulse_list)
length = EstimatePulseLength(pulse, pwm_frames)
_log.info("Detected PWM with pulse length of %d", length)
pulse = np.copy(pulse)
pulse.resize(length)
self.pwm_pulse = pulse
self.pwm_profile = np.tile(pulse, 10)
self.pwm_pulse_length = length
def _GetFeaturesImage(self, camera_space_frame):
"""Trims image to area not showing the screen nor robot arm.
Returns a uint8 image for OpenCV
"""
range_ = self._feature_range
feature_image = camera_space_frame[:, range_[0]:range_[1]]
return (feature_image * 255).astype(np.uint8)
def StabilizeFrame(self, camera_space_frame, debug=False):
"""Stabilizes a frame to line up with the calibration image.
"""
if self._good_features is None or not self.ENABLE_STABILIZATION:
return camera_space_frame
frame = self._GetFeaturesImage(camera_space_frame)
ref = self._GetFeaturesImage(self.black_frame)
# Match features
features, st, err = cv2.calcOpticalFlowPyrLK(ref, frame,
self._good_features, None)
# Filter so only features that have been matched are in the list.
ref_features = self._good_features[st == 1]
features = features[st == 1]
# Average the translation between the features
translation = np.median(features - ref_features, 0)
# Translate image
tform = transform.SimilarityTransform(translation=translation)
stable = transform.warp(camera_space_frame, tform)
if debug:
delta = (0.5 + stable - self.black_frame)
def GetCoords(feature):
return (int(feature[0] + self._feature_range[0]), int(feature[1]))
black = (0, 0, 0)
for good_feature in self._good_features:
print good_feature[0], GetCoords(good_feature[0])
cv2.circle(delta, GetCoords(good_feature[0]), 5, black)
for ref_feature, feature in zip(ref_features, features):
cv2.line(delta, GetCoords(ref_feature), GetCoords(feature), black, 1)
DebugView(debug=delta, title="Stabilization Debug")
return stable
def NormalizeFrame(self, screen_space_frame, pwm_compensation=False,
debug=False):
"""Normalizes color on a frame in screen space.
:param np.ndarray screen_space_frame
:returns np.ndarray
"""
normalized = Filter.Truncate((screen_space_frame - self.black_reference) /
(self.white_reference - self.black_reference))
if not pwm_compensation or self.pwm_profile is None:
return normalized
delta = 1.0 - normalized
mask = delta < 0.8
profile = np.sum(delta * mask, 0) / np.sum(mask, 0)
shift = nputil.EstimateShift(profile, self.pwm_profile, "mae",
max_shift=self.pwm_pulse_length, debug=debug)
pwm_profile = 1.0 - nputil.AlignArrays(profile, self.pwm_profile, shift,
mode="wrap")
# stretch profile into picture
stretched = np.tile(pwm_profile, (screen_space_frame.shape[0], 1))
stretched[stretched < 0.1] = 0.1 # prevent division by 0
compensated = Filter.Truncate(normalized / stretched)
if debug:
DebugView(orig=screen_space_frame, pwm=stretched, compensated=compensated)
return compensated
def Validate(self):
"""Runs some checks on the calibration.
This method will raise an Exception should the calibration show that
the screen is not fully shown in the camera view, the dynamic range is
too small or the screen is not big enough.
"""
if self.is_identity:
return
# create a 1px rectangle mask along the edge of the video frame
# if the screen mask overlaps in too many pixels with this rectangle,
# the screen is probably outside the viewport of the camera.
width, height = self.shape.mask.shape
frame_border = np.zeros((width, height), dtype=np.bool)
frame_border[:, 0] = True
frame_border[:, height - 1] = True
frame_border[0, :] = True
frame_border[width - 1, :] = True
border_overlap = np.sum(frame_border * self.shape.mask)
_log.info("border_overlap=%.2f", border_overlap)
if border_overlap > self.OUT_OF_VIEW_EDGE_PIXELS:
raise Exception("Screen out of camera view")
white_max = Filter.StableMax(self.white_reference)
_log.info("white_max=%.2f", white_max)
if white_max > self.WHITE_LEVEL_MAX:
raise Exception("Calibration is over exposed")
if white_max < self.WHITE_LEVEL_MIN:
raise Exception("Calibration is under exposed")
def CameraToScreenSpace(self, camera_space_frame):
"""Transform camera space frame into screen space.
:param np.ndarray camera_space_frame
:returns np.ndarray
"""
if self.is_identity:
return camera_space_frame
height = int(self.shape.height * self.RECTIFICATION_SCALING)
width = int(self.shape.width * self.RECTIFICATION_SCALING)
transformed = transform.warp(camera_space_frame,
self._rectification_transform,
output_shape=(height, width))
return transformed.astype(camera_space_frame.dtype)
def ScreenToCameraSpace(self, screen_space_frame):
"""Transform screen space frame into camera space.
:param np.ndarray camera_space_frame
:returns np.ndarray
"""
if self.is_identity:
return screen_space_frame
transformed = transform.warp(screen_space_frame,
self._rectification_transform.inverse,
output_shape=self.shape.mask.shape)
return transformed.astype(screen_space_frame.dtype)
def _EstimateRectTransform(self, shape):
"""Estimates transformation from rectified device screen to video frame.
:param Shape shape: shape object of the detected screen
:returns transform.Transform
"""
# Approximate rectangle from screen shape.
dest_coords = shape.ApproximatePolygon(4)
# Coords for transforms are x/y, when indexing y/x. So we have to switch.
dest_coords = dest_coords[:, ::-1]
# Coordinates in target image. Slightly inflates target image size so we
# are not losing too much information during warping.
height = int(self.shape.height * self.RECTIFICATION_SCALING)
width = int(self.shape.width * self.RECTIFICATION_SCALING)
source_coords = np.array((
(0, 0),
(width, 0),
(width, height),
(0, height)
))
tform = transform.ProjectiveTransform()
tform.estimate(source_coords, dest_coords)
return tform
def _CalculateShape(self, black_frame, white_frame):
"""Detect screen from difference between the black and white frame.
:returns Shape: shape object of the detected screen
"""
delta = Filter.Truncate(white_frame - black_frame)
delta = filters.gaussian_filter(delta, 5)
margin_kernel = morphology.disk(self.SHAPE_MARGIN)
binary = delta > self.DIFF_THRESHOLD
binary = morphology.binary_closing(binary, self.CLOSING_KERNEL)
binary = morphology.binary_erosion(binary, margin_kernel)
shapes = Shape.Shapes(binary)
shapes = [s for s in shapes
if (s.area > self.MIN_AREA and
s.props.solidity > self.MIN_SOLIDITY)]
if not shapes:
raise Exception("Cannot find screen shape")
return max(shapes, key=lambda s: s.area)
def __getnewargs__(self):
return (self.black_frame, self.white_frame, self.pwm_pulse)
def __getstate__(self):
return {}