blob: b9a2bb9e952f60e623189730ed618422525c05b4 [file] [log] [blame]
# Copyright 2015 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Numerical python utilities.
Numpy offers a lot, but not everything we need. This module includes a couple
of algorithms for working with 1 dimensional arrays.
"""
import numpy as np
import scipy.signal as signal
from matplotlib import pyplot
def FindZeroCrossings(array):
"""Returns indices of zero crossings.
The returned indices point to the number AFTER the zero crossing.
:param np.ndarray array: 1 dimensional array of floating point values.
"""
pos = array > 0
npos = ~pos
return ((pos[:-1] & npos[1:]) | (npos[:-1] & pos[1:])).nonzero()[0] + 1
def NonNan(array):
return array[~np.isnan(array)]
def MinMaxRange(array):
nonnan = NonNan(array)
if not len(nonnan):
return 0
return np.max(NonNan(array)) - np.min(NonNan(array))
def FindLocalExtremes(array):
"""Returns local extreme indices of array."""
deriv1 = np.diff(array)
return FindZeroCrossings(deriv1)
def Truncate(array, min=0.0, max=1.0):
"""Truncates number or array to min and max."""
if isinstance(array, np.ndarray):
array[array < min] = min
array[array > max] = max
return array
else:
if array < min:
return min
if array > max:
return max
return array
def Normalize(array):
"""Normalize array to 0..1"""
min = np.min(NonNan(array))
max = np.max(NonNan(array))
if np.allclose(min, max):
return array
return (array - min) / (max - min)
def NormalizeStates(array, crossing_threshold=0.5):
"""Normalize array to 0..1"""
pre_norm = Normalize(array)
min = np.median(pre_norm[pre_norm < crossing_threshold])
max = np.median(pre_norm[pre_norm > crossing_threshold])
return (pre_norm - min) / (max - min)
def HysteresisThreshold(array, t_low, t_high, initial=False):
"""Hysteresis thresholding of array."""
result = np.empty(array.shape, dtype=np.bool)
current = initial
for index, value in enumerate(array):
if value > t_high:
current = True
elif value < t_low:
current = False
result[index] = current
return result
def LowPass(array, kernel_size=5):
"""Returns low pass filtered array."""
kernel = np.ones(kernel_size,) / kernel_size
array = np.pad(array, (kernel_size / 2, kernel_size / 2), "edge")
return signal.convolve(array, kernel, "valid")
def BoxFilter(array, box_size):
filtered = np.zeros(array.shape)
current = np.nan
for i in range(len(array)):
if np.isnan(array[i]):
current = np.nan
elif np.isnan(current) or np.abs(array[i] - current) > box_size:
current = array[i]
filtered[i] = current
return filtered
def DeDuplicate(items, min_distance):
if not len(items):
return
for i in range(len(items) - 1):
if items[i+1] - items[i] > min_distance:
yield items[i]
yield items[len(items) - 1]
def Segment(binary_array):
"""Returns list of start, end indicies of connected True values."""
if np.all(~binary_array):
return
delta_array = np.diff(binary_array.astype(np.int))
start = 0
for i, delta in enumerate(delta_array):
if delta > 0:
start = i + 1
elif delta < 0:
yield (start, i)
start = 0
if start > 0:
yield (start, len(binary_array)-1)
def EstimateShift(static, shifting, technique, max_shift=None, debug=False):
"""Estimates shift between two arrays.
:param np.ndarray static: Array that stays stationary.
:param np.ndarray shifting: Array that is supposed to be shifted to match
the static array.
:param str technique: Describes which error measure to use.
- mae: Mean absolute error
- mse: Mean square error
:param Optional[int] max_shift: Maximum shift allowed.
:param bool debug: Show matplotlib debug window after calculation
"""
if max_shift is None:
max_shift = len(shifting)
if technique == "mae":
metric = lambda deltas: np.mean(np.abs(deltas))
elif technique == "mse":
metric = lambda deltas: np.mean(deltas ** 2)
else:
raise ValueError("Unknown technique=%s" % technique)
errors = []
for shift in range(max_shift):
shifted = AlignArrays(static, shifting, shift, mode="wrap")
delta = shifted - static
errors.append(metric(delta[~np.isnan(delta)]))
shift = np.argmin(errors)
if debug:
pyplot.figure()
shifted = AlignArrays(static, shifting, shift, mode="wrap")
pyplot.subplot(2, 1, 0)
pyplot.plot(static)
pyplot.plot(shifted)
pyplot.plot(static - shifted)
pyplot.subplot(2, 1, 1)
pyplot.plot(errors)
pyplot.show()
return shift
def Shift(array, shift, mode="edge"):
"""Shift array left or right without wrap-around.
:param str mode: How to deal with empty space in shifted array
- edge: Fill empty space with value of array at the edge
- wrap: Wrap around array when shifting
"""
if mode == "edge":
if shift > 0:
return np.pad(array, (shift, 0), mode="edge")[:-shift]
else:
shift = -shift
return np.pad(array, (0, shift), mode="edge")[shift:]
elif mode == "wrap":
return np.roll(array, shift)
else:
raise ValueError("No such mode: %s" % mode)
def AlignArrays(target, array, shift, mode="edge"):
"""Aligns two arrays by shifting one and trimming it to the same size"""
shifted = np.copy(Shift(array, shift, mode))
shifted.resize((len(target)))
return shifted
def FindSettlingIndex(profile, start, window_size=8,
min_value=None, max_value=None, max_mid_range=None,
max_slope=None, max_distance=None, max_mean_distance=None,
max_rel_mean_distance=None, combination="and",
debug=False):
"""Returns index of first settled value."""
smoothed = LowPass(profile, 5)
increasing = smoothed[start] < smoothed[start + 1]
for i in range(start, len(profile) - 1):
debug_values = []
conditions = []
def AddCondition(threshold, value, name, condition):
if threshold is None:
return
if debug:
debug_values.append(u"%s=%.2f%s" % (name, value, u"\u2713" if condition else ""))
conditions.append(condition)
def MinCondition(threshold, value, name):
AddCondition(threshold, value, name, value > threshold)
def MaxCondition(threshold, value, name):
AddCondition(threshold, value, name, value < threshold)
value = profile[i]
window_end = Truncate(i + window_size, 0, len(profile))
moving_window = profile[i:(window_end + 1)]
window_mean = np.mean(moving_window[1:])
window_std = np.std(moving_window[1:])
mean_distance = window_mean - value
if not increasing:
mean_distance *= -1;
MinCondition(min_value, value, "v")
MaxCondition(max_value, value, "v")
MaxCondition(max_slope, np.abs(profile[i + 1] - profile[i]), "s")
MaxCondition(max_mid_range, np.max(moving_window) - np.min(moving_window), "mr")
MaxCondition(max_mean_distance, mean_distance, "md")
MaxCondition(max_rel_mean_distance, mean_distance / window_std, "rmd")
if debug:
separator = u" %s " % combination
print "%03d: d=%d" % (i, (i - start)), separator.join(debug_values)
if max_distance is not None and (i - start) > max_distance:
return None
if combination == "and":
if np.all(conditions):
return i
elif combination == "or":
if np.any(conditions):
return i
else:
raise ValueError("Combination has to be one of: (and, or)")
return len(profile) - 1
def FindSettlingIndexReverse(profile, start, debug=False, **kwargs):
rprofile = profile[::-1]
rstart = len(profile) - start - 1
if debug:
print "Reverse %d->%d" % (start, rstart)
rindex = FindSettlingIndex(rprofile, rstart, debug=debug, **kwargs)
return len(profile) - rindex - 1
def FindStateChanges(array, low_thresholds, high_thresholds,
crossing_threshold=0.5, debug=False):
"""Returns tuples of indices to the start and end of state changes.
The input array is expected to be an array between 0..1. A state change
is a change of values from 0 to 1. The change can happen over multiple values,
and this method will return the indices of the start and end of this change.
The start index points to the first value that breaks away from the original
state, and the end index points to the first value that is considered to be
settled into the new state.
"""
# Find indices when the array is crossing the threshold, then find the start
# and end for each crossing.
crossings = FindZeroCrossings(LowPass(array, 5) - crossing_threshold)
for crossing in crossings:
prev = array[max(crossing - 5, 0)]
if array[crossing] - prev > 0:
start = FindSettlingIndexReverse(array, crossing, debug=debug,
**low_thresholds)
end = FindSettlingIndex(array, crossing, debug=debug, **high_thresholds)
else:
start = FindSettlingIndexReverse(array, crossing, debug=debug,
**high_thresholds)
end = FindSettlingIndex(array, crossing, debug=debug, **low_thresholds)
if start is not None and end is not None:
yield (start, end)
def FindPeaks(array, crossing_threshold=0.5, **kwargs):
for segment_start, segment_end in Segment(array > crossing_threshold):
start = FindSettlingIndexReverse(array, segment_start, **kwargs)
end = FindSettlingIndex(array, segment_end, **kwargs)
if start is not None and end is not None:
yield (start, end)