Just for fun.
The below generates graphics like the following:
Each graph is based on a function \theta(x) that shall be averaged.
We try:
- Exponentially decaying average with v(x<0) assumed to be 0
- Exponentially decaying average with v(0) assumed to be \theta(0)
- Exponentially decaying average with v(x<0) assumed to be 0, but bias corrected
- Standard average of a sliding window of the last N values (where N = 1/(1-\beta), beta the decay constant.
from typing import Final, Dict, Tuple, List
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.axes as mplaxes
# Draw "sliding average" graphs of a few noisy functions
# https://numpy.org/doc/stable/reference/generated/numpy.arange.html
# https://numpy.org/doc/stable/reference/random/generated/numpy.random.Generator.normal.html#numpy.random.Generator.normal
# https://numpy.org/doc/stable/reference/generated/numpy.sin.html
# https://numpy.org/doc/stable/reference/generated/numpy.linspace.html#numpy.linspace
# https://numpy.org/doc/stable/reference/generated/numpy.multiply.html
def build_bias_corr(size: int, beta: float) -> np.ndarray:
bias_corr = np.zeros((size, 1), dtype=np.float64)
# what is the bias correction for the first value? set it to 1
bias_corr[0, 0] = 1.0
for i in range(1, size):
bias_corr[i, 0] = 1.0 / (1.0 - beta ** i)
return bias_corr
def build_v_zeros_if_negative(theta: np.ndarray, beta: float) -> np.ndarray:
size: Final[int] = theta.shape[0]
v_zeros_if_negative = np.zeros((size, 1), dtype=np.float64)
v_zeros_if_negative[0, 0] = (1 - beta) * theta[0, 0]
for i in range(1, size):
v_zeros_if_negative[i, 0] = (1 - beta) * theta[i, 0] + beta * v_zeros_if_negative[i - 1, 0]
return v_zeros_if_negative
def build_v_starts_with_theta0(theta: np.ndarray, beta: float) -> np.ndarray:
size: Final[int] = theta.shape[0]
v_starts_with_theta0 = np.zeros((size, 1), dtype=np.float64)
v_starts_with_theta0[0, 0] = theta[0, 0]
for i in range(1, size):
v_starts_with_theta0[i, 0] = (1 - beta) * theta[i, 0] + beta * v_starts_with_theta0[i - 1, 0]
return v_starts_with_theta0
def build_v_with_bias_corr(theta: np.ndarray, bias_corr: np.ndarray, beta: float) -> np.ndarray:
return np.multiply(build_v_zeros_if_negative(theta, beta), bias_corr)
def build_v_sliding_avg(theta: np.ndarray, beta: float) -> Tuple[np.ndarray, int]:
size: Final[int] = theta.shape[0]
avg_size = np.rint(1.0 / (1.0 - beta)).astype(int)
v_sliding_avg = np.zeros((size, 1), dtype=np.float64)
for i in range(avg_size - 1, size):
v_sliding_avg[i, 0] = np.sum(theta[i - avg_size + 1:i, 0], axis=0) / avg_size
return (v_sliding_avg, avg_size)
def build_v(theta: np.ndarray, bias_corr: np.ndarray, beta: float) -> Dict:
v_zeros_if_negative = build_v_zeros_if_negative(theta, beta)
v_starts_with_theta0 = build_v_starts_with_theta0(theta, beta)
v_bias_corr = build_v_with_bias_corr(theta, bias_corr, beta)
v_sliding_avg, avg_size = build_v_sliding_avg(theta, beta)
max_y = np.max([np.max(theta),
np.max(v_zeros_if_negative),
np.max(v_starts_with_theta0),
np.max(v_bias_corr),
np.max(v_sliding_avg)])
min_y = np.min([np.min(theta),
np.min(v_zeros_if_negative),
np.min(v_starts_with_theta0),
np.min(v_bias_corr),
np.min(v_sliding_avg)])
return {"theta": theta,
"v_zeros_if_negative": v_zeros_if_negative,
"v_starts_with_theta0": v_starts_with_theta0,
"v_bias_corr": v_bias_corr,
"v_sliding_avg": v_sliding_avg,
"avg_size": avg_size,
"y_range": (min_y, max_y)}
def find_limits(x: np.ndarray, dict_list: List[Dict]) -> Dict:
min_x = x[0, 0]
max_x = x[-1, 0]
width = max_x - min_x
min_y_list = []
max_y_list = []
for dict in dict_list:
min_y, max_y = dict["y_range"]
min_y_list.append(min_y)
max_y_list.append(max_y)
min_y = min(min_y_list)
max_y = max(max_y_list)
height = max_y - min_y
return {"min_x": min_x,
"max_x": max_x,
"min_y": min_y,
"max_y": max_y,
"width": width,
"height": height}
def set_axis_limits(ax: mplaxes.Axes, limits: Dict) -> None:
min_x = limits["min_x"]
max_x = limits["max_x"]
min_y = limits["min_y"]
max_y = limits["max_y"]
width = limits["width"]
height = limits["height"]
ax.set_xlim(min_x - width / 10, max_x + width / 10)
ax.set_ylim(min_y - height / 10, max_y + height / 10)
ax.set_aspect(aspect='equal')
def plot_sublot_noisy(x: np.ndarray, ax: mplaxes.Axes, dict_noisy_foo: Dict, noise_std_deviation, name: str) -> None:
avg_size = dict_noisy_foo["avg_size"]
ax.plot(x, dict_noisy_foo["theta"], label=f'{name} (σ = {noise_std_deviation:.2f})', marker='x', linestyle='none')
ax.plot(x, dict_noisy_foo["v_zeros_if_negative"], label='v, v(x<0) = 0', marker='.', linestyle='none')
ax.plot(x, dict_noisy_foo["v_starts_with_theta0"], label='v, v(0) = Θ(0)', marker='.', linestyle='none')
ax.plot(x, dict_noisy_foo["v_bias_corr"], label='v, v(x<0) = 0, bias corr.', marker='.', linestyle='none')
restricted_x = x[avg_size - 1:, 0]
restricted_v = dict_noisy_foo["v_sliding_avg"][avg_size - 1:, 0]
ax.plot(restricted_x, restricted_v, label=f'v, sliding avg (window size = {avg_size})', marker='.', linestyle='none')
ax.legend()
ax.grid()
def plot_sublot_nonoise(x: np.ndarray, ax: mplaxes.Axes, dict_noisy_foo: Dict, name: str) -> None:
avg_size = dict_noisy_foo["avg_size"]
ax.plot(x, dict_noisy_foo["theta"], label=f'{name}', marker='x', linestyle='none')
ax.plot(x, dict_noisy_foo["v_zeros_if_negative"], label='v, v(x<0) = 0', marker='.', linestyle='none')
ax.plot(x, dict_noisy_foo["v_starts_with_theta0"], label='v, v(0) = Θ(0)', marker='.', linestyle='none')
ax.plot(x, dict_noisy_foo["v_bias_corr"], label='v, v(x<0) = 0, bias corr.', marker='.', linestyle='none')
restricted_x = x[avg_size - 1:, 0]
restricted_v = dict_noisy_foo["v_sliding_avg"][avg_size - 1:, 0]
ax.plot(restricted_x, restricted_v, label=f'v, sliding avg (window size = {avg_size})', marker='.', linestyle='none')
ax.legend()
ax.grid()
def plot(x: np.ndarray,
dict_noisy_line: Dict,
dict_noisy_sinus: Dict,
dict_swinging: Dict,
dict_cosine: Dict,
noise_std_deviation) -> None:
fig, axes = plt.subplots(2, 2, figsize=(12, 12))
# "axes" is Axes or array of Axes https://matplotlib.org/stable/api/_as_gen/matplotlib.axes.Axes.html#matplotlib.axes.Axes
ax_noisy_line = axes[0, 0]
ax_noisy_sinus = axes[0, 1]
ax_swinging = axes[1, 0]
ax_cosine = axes[1, 1]
limits: Dict = find_limits(x, [dict_noisy_line, dict_noisy_sinus, dict_swinging, dict_cosine])
set_axis_limits(ax_noisy_line, limits)
set_axis_limits(ax_noisy_sinus, limits)
set_axis_limits(ax_swinging, limits)
set_axis_limits(ax_cosine, limits)
plot_sublot_noisy(x, ax_noisy_line, dict_noisy_line, noise_std_deviation, "noisy line")
plot_sublot_noisy(x, ax_noisy_sinus, dict_noisy_sinus, noise_std_deviation, "noisy sinus")
plot_sublot_nonoise(x, ax_swinging, dict_swinging, "superposed sin,cos,sin")
plot_sublot_nonoise(x, ax_cosine, dict_cosine, "cosine")
plt.tight_layout()
plt.show()
def build_noisy_sinus(x: np.ndarray, noise: np.ndarray) -> np.ndarray:
two_pi: Final[float] = 2 * np.pi
size = x.shape[0]
period_num: Final[float] = 2 # number of periods of noisy sinus
amplitude: Final[float] = 10 # amplitude of noisy sinus
y_offset: Final[float] = 10 # offset of noisy sinus
x_values: Final[np.ndarray] = np.linspace(0.0, two_pi * period_num, num=size, dtype=np.float64).reshape(-1, 1)
assert x_values[0, 0] == 0.0
assert np.isclose(x_values[size - 1, 0].squeeze(), two_pi * period_num)
return np.sin(x_values) * amplitude + noise + y_offset
def build_swinging(x: np.ndarray) -> np.ndarray:
two_pi: Final[float] = 2 * np.pi
size = x.shape[0]
x1_values: Final[np.ndarray] = np.linspace(0.0, two_pi * 0.5, num=size, dtype=np.float64).reshape(-1, 1)
x2_values: Final[np.ndarray] = np.linspace(0.0, two_pi * 2, num=size, dtype=np.float64).reshape(-1, 1)
x3_values: Final[np.ndarray] = np.linspace(0.0, two_pi * 4, num=size, dtype=np.float64).reshape(-1, 1)
return np.sin(x1_values) * 20 + np.cos(x2_values) * 3 + np.sin(x3_values)
def build_cosine(x: np.ndarray) -> np.ndarray:
two_pi: Final[float] = 2 * np.pi
size = x.shape[0]
x_values: Final[np.ndarray] = np.linspace(0.0, two_pi * 2, num=size, dtype=np.float64).reshape(-1, 1)
return np.cos(x_values) * 10
def build_noisy_line(x: np.ndarray, noise: np.ndarray) -> np.ndarray:
slope: Final[float] = 0.5 # slope of noisy line
return x * slope + noise
def main():
# use this initialized RNG for reproducible results:
# rng = np.random.default_rng(seed=42)
# use this RNG that depends on current time for interesting results:
rng = np.random.default_rng()
#
beta: Final[float] = 0.9 # beta of the running averages (factor applied to the previous average)
size: Final[int] = 50 # number of data points in the graph
#
x: Final[np.ndarray] = np.arange(0, size, dtype=np.int32).reshape(-1, 1)
assert x.shape == (size, 1)
#
noise_std_deviation: Final[float] = 4.0 # stddev of normal noise
noise: Final[np.ndarray] = rng.normal(size=size, scale=noise_std_deviation).reshape(-1, 1)
assert noise.shape == (size, 1)
#
bias_corr: Final[np.ndarray] = build_bias_corr(size, beta)
#
noisy_line: Final[np.ndarray] = build_noisy_line(x, noise)
noisy_sinus: Final[np.ndarray] = build_noisy_sinus(x, noise)
swinging: Final[np.ndarray] = build_swinging(x)
cosine: Final[np.ndarray] = build_cosine(x)
#
dict_noisy_line = build_v(noisy_line, bias_corr, beta)
dict_noisy_sinus = build_v(noisy_sinus, bias_corr, beta)
dict_swinging = build_v(swinging, bias_corr, beta)
dict_cosine = build_v(cosine, bias_corr, beta)
#
plot(x, dict_noisy_line, dict_noisy_sinus, dict_swinging, dict_cosine, noise_std_deviation)
main()