blob: 613ee6bdea1ba3d456217e7db283368abd4322a8 [file] [log] [blame]
# Copyright 2015 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from collections import namedtuple
import logging
import numpy as np
import numpy.linalg as linalg
from optofidelity.videoproc import Canvas, Filter, Shape, DebugView
import skimage.filters
from ._detector import Detector
from .events import Event, LEDEvent
_log = logging.getLogger(__name__)
Flash = namedtuple("Flash", ("coords", "color"))
class LEDDetector(Detector):
"""Detects flashing LEDs."""
NAME = "led"
LED_SCREEN_MARGIN = 50
"""Margin around the screen when blacking out the screen for LED
detection."""
LED_GAUSSIAN_SIGMA = 5.0
"""Sigma of gaussian filter for smoothing."""
LED_MIN_DIFF_THRESHOLD = 0.2
"""Pixels that have changed by at least this amount from the last frame are
candidates for a blinking LED."""
LED_MIN_AREA = 100
"""Minimum size of an LED in the min filtered and thresholded picture."""
SCREEN_MAX_DELTA_THRESH = 0.9
"""Maximum average delta of the screen before suppressing LED events."""
DEBOUNCE_DURATION = 5
"""Number of camera frames to ignore after an LED switched."""
LED_MIN_DISTANCE = 50
"""Distance [in px] between two flashes to be considered separate LEDs."""
def __init__(self):
self.recent_flash_times = []
def Preprocess(self, calib_frame, debugger):
def log(msg, *params):
_log.debug("%04d: " + msg, calib_frame.frame_index, *params)
# Create mask showing everything outside the active screen.
array_shape = calib_frame.camera_space_shape
if calib_frame.has_calibration:
margin = self.LED_SCREEN_MARGIN
screen_shape = calib_frame.camera_space_screen_shape
left = (screen_shape.left - margin)
if left < 0:
left = 0
right = (screen_shape.right + margin)
if right > array_shape[1]:
right = array_shape[1]
area_of_interest = Shape.FromRectangle(array_shape, left=left,
right=right, bottom=screen_shape.top - margin)
else:
area_of_interest = Shape(np.ones(array_shape, dtype=np.bool))
if debugger:
debugger.camera_space_canvas.DrawMask(Canvas.GREEN,
area_of_interest.contour)
# If the screen is switching, reflections of the screen will cause parts
# of the robot to look like a flashing LED. So We want to suppress any
# LED events during this time.
if calib_frame.has_calibration:
screen_profile = np.mean(calib_frame.screen_space_delta, 0)
screen_max_delta = np.max(np.abs(screen_profile))
log("screen_max_delta=%.2f", screen_max_delta)
if screen_max_delta > self.SCREEN_MAX_DELTA_THRESH:
log("Suspending LED detection")
return []
# Look for an LED shaped large change in the masked area.
delta = calib_frame.camera_space_delta * area_of_interest.mask
delta = skimage.filters.gaussian_filter(delta, self.LED_GAUSSIAN_SIGMA)
binary = np.abs(delta) > self.LED_MIN_DIFF_THRESHOLD
flashes = []
for shape in Shape.Shapes(binary):
draw_color = Canvas.BLUE
if shape.area > self.LED_MIN_AREA:
color = np.mean(calib_frame.camera_space_delta[shape.mask])
flashes.append(Flash(shape.center, color))
draw_color = Canvas.RED
log("Flash: center=%s, color=%.2f", shape.center, color)
if debugger:
debugger.camera_space_canvas.DrawMask(draw_color, shape.contour)
return flashes
def GenerateEvents(self, preprocessed_data, debug=False):
for frame_index, flashes in enumerate(preprocessed_data):
if flashes is None:
continue
updated = [(flash, time) for flash, time in self.recent_flash_times
if time + self.DEBOUNCE_DURATION >= frame_index]
self.recent_flash_times = updated
def has_recently_flashed(flash):
for recent_flash, time in self.recent_flash_times:
distance = linalg.norm(flash.coords - recent_flash.coords)
if distance < self.LED_MIN_DISTANCE:
return True
return False
# If a flash happens a frame right after another one, assume it is part of
# the same LED and add all flashes to the recent flash times but do not
# generate another event.
last_frame_flashes = [(flash, time) for flash, time in updated
if frame_index - time <= 1]
if len(last_frame_flashes):
for flash in flashes:
self.recent_flash_times.append((flash, frame_index - 1))
continue
events = []
for flash in flashes:
if has_recently_flashed(flash):
continue
self.recent_flash_times.append((flash, frame_index))
state = Event.STATE_ON if flash.color > 0 else Event.STATE_OFF
events.append(LEDEvent(frame_index, state=state))
# There may be multiple flashes due to reflections, return only one event.
if len(events) >= 1:
yield events[0]