sos_filter: Second Order Sections MCU Filter

This is an implementation of the SOS fliltering algorithm that runs on the MCU.

The filter opperates on data in fixed point format to avoid use of the FPU as klipper does not support FPU usage.

This host object handles duties of initalizing and resetting the filter so client dont have to declare their own commands for these opperations. Clients can select how many integer bits they want to use for both the filter coefficients and the filters output value. An arbitrary number of filter sections can be configured. Filters can be designed on the fly with the SciPy library or loaded from another source.

Signed-off-by: Gareth Farrington <gareth@waves.ky>
This commit is contained in:
Gareth Farrington 2025-03-19 09:22:45 -07:00 committed by Kevin O'Connor
parent 0181023954
commit cb0c38f7d8
5 changed files with 407 additions and 0 deletions

232
klippy/extras/sos_filter.py Normal file
View File

@ -0,0 +1,232 @@
# Second Order Sections Filter
#
# Copyright (C) 2025 Gareth Farrington <gareth@waves.ky>
#
# This file may be distributed under the terms of the GNU GPLv3 license.
MAX_INT32 = (2 ** 31)
MIN_INT32 = -(2 ** 31) - 1
def assert_is_int32(value, error):
if value > MAX_INT32 or value < MIN_INT32:
raise OverflowError(error)
return value
# convert a floating point value to a 32 bit fixed point representation
# checks for overflow
def to_fixed_32(value, int_bits):
fractional_bits = (32 - (1 + int_bits))
fixed_val = int(value * (2 ** fractional_bits))
return assert_is_int32(fixed_val, "Fixed point Q%i overflow"
% (int_bits,))
# Digital filter designer and container
class DigitalFilter:
def __init__(self, sps, cfg_error, highpass=None, highpass_order=1,
lowpass=None, lowpass_order=1, notches=None, notch_quality=2.0):
self.filter_sections = []
self.initial_state = []
self.sample_frequency = sps
# an empty filter can be created without SciPi/numpy
if not (highpass or lowpass or notches):
return
try:
import scipy.signal as signal
except:
raise cfg_error("DigitalFilter require the SciPy module")
if highpass:
self.filter_sections.append(
self._butter(highpass, "highpass", highpass_order))
if lowpass:
self.filter_sections.append(
self._butter(lowpass, "lowpass", lowpass_order))
for notch_freq in notches:
self.filter_sections.append(self._notch(notch_freq, notch_quality))
if len(self.filter_sections) > 0:
self.initial_state = signal.sosfilt_zi(self.filter_sections)
def _butter(self, frequency, btype, order):
import scipy.signal as signal
return signal.butter(order, Wn=frequency, btype=btype,
fs=self.sample_frequency, output='sos')[0]
def _notch(self, freq, quality):
import scipy.signal as signal
b, a = signal.iirnotch(freq, Q=quality, fs=self.sample_frequency)
return signal.tf2sos(b, a)[0]
def get_filter_sections(self):
return self.filter_sections
def get_initial_state(self):
return self.initial_state
# container that accepts SciPy formatted SOS filter data and converts it to a
# selected fixed point representation. This data could come from DigitalFilter,
# static data, config etc.
class FixedPointSosFilter:
# filter_sections is an array of SciPy formatted SOS filter sections (sos)
# initial_state is an array of SciPy formatted SOS state sections (zi)
def __init__(self, filter_sections=None, initial_state=None,
coeff_int_bits=2, value_int_bits=15):
filter_sections = [] if filter_sections is None else filter_sections
initial_state = [] if initial_state is None else initial_state
num_sections = len(filter_sections)
num_state = len(initial_state)
if num_state != num_sections:
raise ValueError("The number of filter sections (%i) and state "
"sections (%i) must be equal" % (
num_sections, num_state))
self._coeff_int_bits = self._validate_int_bits(coeff_int_bits)
self._value_int_bits = self._validate_int_bits(value_int_bits)
self._filter = self._convert_filter(filter_sections)
self._state = self._convert_state(initial_state)
def get_filter_sections(self):
return self._filter
def get_initial_state(self):
return self._state
def get_coeff_int_bits(self):
return self._coeff_int_bits
def get_value_int_bits(self):
return self._value_int_bits
def get_num_sections(self):
return len(self._filter)
def _validate_int_bits(self, int_bits):
if int_bits < 1 or int_bits > 30:
raise ValueError("The number of integer bits (%i) must be a"
" value between 1 and 30" % (int_bits,))
return int_bits
# convert the SciPi SOS filters to fixed point format
def _convert_filter(self, filter_sections):
sos_fixed = []
for section in filter_sections:
nun_coeff = len(section)
if nun_coeff != 6:
raise ValueError("The number of filter coefficients is %i"
", must be 6" % (nun_coeff,))
fixed_section = []
for col, coeff in enumerate(section):
if col != 3: # omit column 3
fixed_coeff = to_fixed_32(coeff, self._coeff_int_bits)
fixed_section.append(fixed_coeff)
elif coeff != 1.0: # double check colum 3 is always 1.0
raise ValueError("Coefficient 3 is expected to be 1.0"
" but was %f" % (coeff,))
sos_fixed.append(fixed_section)
return sos_fixed
# convert the SOS filter state matrix (zi) to fixed point format
def _convert_state(self, filter_state):
sos_state = []
for section in filter_state:
nun_states = len(section)
if nun_states != 2:
raise ValueError(
"The number of state elements is %i, must be 2"
% (nun_states,))
fixed_state = []
for col, value in enumerate(section):
fixed_state.append(to_fixed_32(value, self._value_int_bits))
sos_state.append(fixed_state)
return sos_state
# Control an `sos_filter` object on the MCU
class SosFilter:
# fixed_point_filter should be an FixedPointSosFilter instance. A filter of
# size 0 will create a passthrough filter.
# max_sections should be the largest number of sections you expect
# to use at runtime. The default is the size of the fixed_point_filter.
def __init__(self, mcu, cmd_queue, fixed_point_filter, max_sections=None):
self._mcu = mcu
self._cmd_queue = cmd_queue
self._oid = self._mcu.create_oid()
self._filter = fixed_point_filter
self._max_sections = max_sections
if self._max_sections is None:
self._max_sections = self._filter.get_num_sections()
self._cmd_set_section = [
"sos_filter_set_section oid=%d section_idx=%d"
" sos0=%i sos1=%i sos2=%i sos3=%i sos4=%i",
"sos_filter_set_section oid=%c section_idx=%c"
" sos0=%i sos1=%i sos2=%i sos3=%i sos4=%i"]
self._cmd_config_state = [
"sos_filter_set_state oid=%d section_idx=%d state0=%i state1=%i",
"sos_filter_set_state oid=%c section_idx=%c state0=%i state1=%i"]
self._cmd_activate = [
"sos_filter_set_active oid=%d n_sections=%d coeff_int_bits=%d",
"sos_filter_set_active oid=%c n_sections=%c coeff_int_bits=%c"]
self._mcu.register_config_callback(self._build_config)
def _build_config(self):
cmds = [self._cmd_set_section, self._cmd_config_state,
self._cmd_activate]
for cmd in cmds:
cmd.append(self._mcu.lookup_command(cmd[1], cq=self._cmd_queue))
def get_oid(self):
return self._oid
# create an uninitialized filter object
def create_filter(self):
self._mcu.add_config_cmd("config_sos_filter oid=%d max_sections=%u"
% (self._oid, self._max_sections))
self._configure_filter(is_init=True)
# either setup an init command or send the command based on a flag
def _cmd(self, command, args, is_init=False):
if is_init:
self._mcu.add_config_cmd(command[0] % args, is_init=True)
else:
command[2].send(args)
def _set_filter_sections(self, is_init=False):
for i, section in enumerate(self._filter.get_filter_sections()):
args = (self._oid, i, section[0], section[1], section[2],
section[3], section[4])
self._cmd(self._cmd_set_section, args, is_init)
def _set_filter_state(self, is_init=False):
for i, state in enumerate(self._filter.get_initial_state()):
args = (self._oid, i, state[0], state[1])
self._cmd(self._cmd_config_state, args, is_init)
def _activate_filter(self, is_init=False):
args = (self._oid, self._filter.get_num_sections(),
self._filter.get_coeff_int_bits())
self._cmd(self._cmd_activate, args, is_init)
# configure the filter sections on the mcu
# filters should be an array of filter sections in SciPi SOS format
# sos_filter_state should be an array of zi filter state elements
def _configure_filter(self, is_init=False):
num_sections = self._filter.get_num_sections()
if num_sections > self._max_sections:
raise ValueError("Too many filter sections: %i, The max is %i"
% (num_sections, self._max_sections,))
# convert to fixed point to find errors
# no errors, state is accepted
# configure MCU filter and activate
self._set_filter_sections(is_init)
self._set_filter_state(is_init,)
self._activate_filter(is_init)
# Change the filter coefficients and state at runtime
# fixed_point_filter should be an FixedPointSosFilter instance
# cq is an optional command queue to for command sequencing
def change_filter(self, fixed_point_filter):
self._filter = fixed_point_filter
self._configure_filter(False)
# Resets the filter state back to initial conditions at runtime
# cq is an optional command queue to for command sequencing
def reset_filter(self):
self._set_filter_state(False)
self._activate_filter(False)

View File

@ -177,6 +177,10 @@ config NEED_SENSOR_BULK
depends on WANT_ADXL345 || WANT_LIS2DW || WANT_MPU9250 || WANT_ICM20948 \
|| WANT_HX71X || WANT_ADS1220 || WANT_LDC1612 || WANT_SENSOR_ANGLE
default y
config NEED_SOS_FILTER
bool
depends on WANT_HX71X || WANT_ADS1220
default y
menu "Optional features (to reduce code size)"
depends on HAVE_LIMITED_CODE_SIZE
config WANT_ADC

View File

@ -27,3 +27,4 @@ src-$(CONFIG_WANT_ADS1220) += sensor_ads1220.c
src-$(CONFIG_WANT_LDC1612) += sensor_ldc1612.c
src-$(CONFIG_WANT_SENSOR_ANGLE) += sensor_angle.c
src-$(CONFIG_NEED_SENSOR_BULK) += sensor_bulk.c
src-$(CONFIG_NEED_SOS_FILTER) += sos_filter.c

158
src/sos_filter.c Normal file
View File

@ -0,0 +1,158 @@
// Second Order sections Filter implementation using Fixed Point math
//
// Copyright (C) 2025 Gareth Farrington <gareth@waves.ky>
//
// This file may be distributed under the terms of the GNU GPLv3 license.
#include "basecmd.h" // oid_alloc
#include "command.h" // DECL_COMMAND
#include "sched.h" // shutdown
#include "sos_filter.h" // sos_filter
typedef int32_t fixedQ_coeff_t;
typedef int32_t fixedQ_value_t;
// filter strucutre sizes
#define SECTION_WIDTH 5
#define STATE_WIDTH 2
struct sos_filter_section {
// filter composed of second order sections
fixedQ_coeff_t coeff[SECTION_WIDTH]; // aka sos
fixedQ_value_t state[STATE_WIDTH]; // aka zi
};
struct sos_filter {
uint8_t max_sections, n_sections, coeff_frac_bits, is_active;
uint32_t coeff_rounding;
// filter composed of second order sections
struct sos_filter_section filter[0];
};
inline uint8_t
overflows_int32(int64_t value) {
return value > (int64_t)INT32_MAX || value < (int64_t)INT32_MIN;
}
// Multiply a coefficient in fixedQ_coeff_t by a value fixedQ_value_t
static inline fixedQ_value_t
fixed_mul(struct sos_filter *sf, const fixedQ_coeff_t coeff
, const fixedQ_value_t value) {
// This optimizes to single cycle SMULL on Arm Coretex M0+
int64_t product = (int64_t)coeff * (int64_t)value;
// round up at the last bit to be shifted away
product += sf->coeff_rounding;
// shift the decimal right to discard the coefficient fractional bits
int64_t result = product >> sf->coeff_frac_bits;
// check for overflow of int32_t
if (overflows_int32(result)) {
shutdown("fixed_mul: overflow");
}
// truncate significant 32 bits
return (fixedQ_value_t)result;
}
// Apply the sosfilt algorithm to a new datapoint
// returns the fixedQ_value_t filtered value
int32_t
sosfilt(struct sos_filter *sf, const int32_t unfiltered_value) {
if (!sf->is_active) {
shutdown("sos_filter not property initialized");
}
// an empty filter performs no filtering
if (sf->n_sections == 0) {
return unfiltered_value;
}
fixedQ_value_t cur_val = unfiltered_value;
// foreach section
for (int section_idx = 0; section_idx < sf->n_sections; section_idx++) {
struct sos_filter_section *section = &(sf->filter[section_idx]);
// apply the section's filter coefficients to input
fixedQ_value_t next_val = fixed_mul(sf, section->coeff[0], cur_val);
next_val += section->state[0];
section->state[0] = fixed_mul(sf, section->coeff[1], cur_val)
- fixed_mul(sf, section->coeff[3], next_val)
+ (section->state[1]);
section->state[1] = fixed_mul(sf, section->coeff[2], cur_val)
- fixed_mul(sf, section->coeff[4], next_val);
cur_val = next_val;
}
return (int32_t)cur_val;
}
// Create an sos_filter
void
command_config_sos_filter(uint32_t *args)
{
uint32_t max_sections = args[1];
uint32_t size = offsetof(struct sos_filter, filter[max_sections]);
struct sos_filter *sf = oid_alloc(args[0]
, command_config_sos_filter, size);
sf->max_sections = max_sections;
sf->is_active = 0;
}
DECL_COMMAND(command_config_sos_filter, "config_sos_filter oid=%c"
" max_sections=%u");
// Lookup an sos_filter
struct sos_filter *
sos_filter_oid_lookup(uint8_t oid)
{
return oid_lookup(oid, command_config_sos_filter);
}
// Set one section of the filter
void
command_sos_filter_set_section(uint32_t *args)
{
struct sos_filter *sf = sos_filter_oid_lookup(args[0]);
// setting a section marks the filter as inactive
sf->is_active = 0;
uint8_t section_idx = args[1];
// copy section data
const uint8_t arg_base = 2;
for (uint8_t i = 0; i < SECTION_WIDTH; i++) {
sf->filter[section_idx].coeff[i] = args[i + arg_base];
}
}
DECL_COMMAND(command_sos_filter_set_section
, "sos_filter_set_section oid=%c section_idx=%c"
" sos0=%i sos1=%i sos2=%i sos3=%i sos4=%i");
// Set the state of one section of the filter
void
command_sos_filter_set_state(uint32_t *args)
{
struct sos_filter *sf = sos_filter_oid_lookup(args[0]);
// setting a section's state marks the filter as inactive
sf->is_active = 0;
// copy state data
uint8_t section_idx = args[1];
const uint8_t arg_base = 2;
sf->filter[section_idx].state[0] = args[0 + arg_base];
sf->filter[section_idx].state[1] = args[1 + arg_base];
}
DECL_COMMAND(command_sos_filter_set_state
, "sos_filter_set_state oid=%c section_idx=%c state0=%i state1=%i");
// Set one section of the filter
void
command_sos_filter_activate(uint32_t *args)
{
struct sos_filter *sf = sos_filter_oid_lookup(args[0]);
uint8_t n_sections = args[1];
if (n_sections > sf->max_sections) {
shutdown("Filter section count larger than max_sections");
}
sf->n_sections = n_sections;
const uint8_t coeff_int_bits = args[2];
sf->coeff_frac_bits = (31 - coeff_int_bits);
sf->coeff_rounding = (1 << (sf->coeff_frac_bits - 1));
// mark filter as ready to use
sf->is_active = 1;
}
DECL_COMMAND(command_sos_filter_activate
, "sos_filter_set_active oid=%c n_sections=%c coeff_int_bits=%c");

12
src/sos_filter.h Normal file
View File

@ -0,0 +1,12 @@
#ifndef __SOS_FILTER_H
#define __SOS_FILTER_H
#include <stdint.h>
struct sos_filter;
int32_t sosfilt(struct sos_filter *sf
, const int32_t unfiltered_value);
struct sos_filter *sos_filter_oid_lookup(uint8_t oid);
#endif // sos_filter.h