klipper/klippy/extras/adxl345.py
Kevin O'Connor e34137582d adxl345: Implement timing via new adxl345_status messages
Query the adxl345 message counter every 100ms so that accurate timing
can be obtained during measurements.  This allows the adxl345 data to
be exported with timestamps while captures are running.

Signed-off-by: Kevin O'Connor <kevin@koconnor.net>
2021-08-30 12:25:50 -04:00

417 lines
19 KiB
Python

# Support for reading acceleration data from an adxl345 chip
#
# Copyright (C) 2020-2021 Kevin O'Connor <kevin@koconnor.net>
#
# This file may be distributed under the terms of the GNU GPLv3 license.
import logging, time, collections, threading, multiprocessing, os
from . import bus, motion_report
# ADXL345 registers
REG_DEVID = 0x00
REG_BW_RATE = 0x2C
REG_POWER_CTL = 0x2D
REG_DATA_FORMAT = 0x31
REG_FIFO_CTL = 0x38
REG_MOD_READ = 0x80
REG_MOD_MULTI = 0x40
QUERY_RATES = {
25: 0x8, 50: 0x9, 100: 0xa, 200: 0xb, 400: 0xc,
800: 0xd, 1600: 0xe, 3200: 0xf,
}
ADXL345_DEV_ID = 0xe5
FREEFALL_ACCEL = 9.80665 * 1000.
SCALE = 0.0039 * FREEFALL_ACCEL # 3.9mg/LSB * Earth gravity in mm/s**2
Accel_Measurement = collections.namedtuple(
'Accel_Measurement', ('time', 'accel_x', 'accel_y', 'accel_z'))
# Helper class to obtain measurements
class ADXL345QueryHelper:
def __init__(self, printer, cconn):
self.printer = printer
self.cconn = cconn
print_time = printer.lookup_object('toolhead').get_last_move_time()
self.request_start_time = self.request_end_time = print_time
self.samples = []
def finish_measurements(self):
toolhead = self.printer.lookup_object('toolhead')
self.request_end_time = toolhead.get_last_move_time()
toolhead.wait_moves()
self.cconn.finalize()
def decode_samples(self):
raw_samples = self.cconn.get_messages()
if not raw_samples:
return self.samples
total = sum([len(m['params']['data']) for m in raw_samples])
count = 0
self.samples = samples = [None] * total
for msg in raw_samples:
for samp_time, x, y, z in msg['params']['data']:
if samp_time < self.request_start_time:
continue
if samp_time > self.request_end_time:
break
samples[count] = Accel_Measurement(samp_time, x, y, z)
count += 1
del samples[count:]
return self.samples
def write_to_file(self, filename):
def write_impl():
try:
# Try to re-nice writing process
os.nice(20)
except:
pass
f = open(filename, "w")
f.write("#time,accel_x,accel_y,accel_z\n")
samples = self.samples or self.decode_samples()
for t, accel_x, accel_y, accel_z in samples:
f.write("%.6f,%.6f,%.6f,%.6f\n" % (
t, accel_x, accel_y, accel_z))
f.close()
write_proc = multiprocessing.Process(target=write_impl)
write_proc.daemon = True
write_proc.start()
# Helper class for G-Code commands
class ADXLCommandHelper:
def __init__(self, config, chip):
self.printer = config.get_printer()
self.chip = chip
self.bg_client = None
self.name = "default"
if len(config.get_name().split()) > 1:
self.name = config.get_name().split()[1]
self.register_commands(self.name)
if self.name == "default":
self.register_commands(None)
def register_commands(self, name):
# Register commands
gcode = self.printer.lookup_object('gcode')
gcode.register_mux_command("ACCELEROMETER_MEASURE", "CHIP", name,
self.cmd_ACCELEROMETER_MEASURE,
desc=self.cmd_ACCELEROMETER_MEASURE_help)
gcode.register_mux_command("ACCELEROMETER_QUERY", "CHIP", name,
self.cmd_ACCELEROMETER_QUERY,
desc=self.cmd_ACCELEROMETER_QUERY_help)
gcode.register_mux_command("ADXL345_DEBUG_READ", "CHIP", name,
self.cmd_ADXL345_DEBUG_READ,
desc=self.cmd_ADXL345_DEBUG_READ_help)
gcode.register_mux_command("ADXL345_DEBUG_WRITE", "CHIP", name,
self.cmd_ADXL345_DEBUG_WRITE,
desc=self.cmd_ADXL345_DEBUG_WRITE_help)
cmd_ACCELEROMETER_MEASURE_help = "Start/stop accelerometer"
def cmd_ACCELEROMETER_MEASURE(self, gcmd):
if not self.chip.is_measuring():
# Start measurements
self.bg_client = self.chip.start_internal_client()
gcmd.respond_info("adxl345 measurements started")
return
if self.bg_client is None:
raise gcmd.error("adxl345 measurements in progress")
# End measurements
name = gcmd.get("NAME", time.strftime("%Y%m%d_%H%M%S"))
if not name.replace('-', '').replace('_', '').isalnum():
raise gcmd.error("Invalid adxl345 NAME parameter")
bg_client = self.bg_client
self.bg_client = None
bg_client.finish_measurements()
# Write data to file
if self.name == "default":
filename = "/tmp/adxl345-%s.csv" % (name,)
else:
filename = "/tmp/adxl345-%s-%s.csv" % (self.name, name,)
bg_client.write_to_file(filename)
gcmd.respond_info("Writing raw accelerometer data to %s file"
% (filename,))
cmd_ACCELEROMETER_QUERY_help = "Query accelerometer for the current values"
def cmd_ACCELEROMETER_QUERY(self, gcmd):
if self.chip.is_measuring():
raise gcmd.error("adxl345 measurements in progress")
aclient = self.chip.start_internal_client()
self.printer.lookup_object('toolhead').dwell(1.)
aclient.finish_measurements()
values = aclient.decode_samples()
if not values:
raise gcmd.error("No adxl345 measurements found")
_, accel_x, accel_y, accel_z = values[-1]
gcmd.respond_info("adxl345 values (x, y, z): %.6f, %.6f, %.6f"
% (accel_x, accel_y, accel_z))
cmd_ADXL345_DEBUG_READ_help = "Query accelerometer register (for debugging)"
def cmd_ADXL345_DEBUG_READ(self, gcmd):
if self.chip.is_measuring():
raise gcmd.error("adxl345 measurements in progress")
reg = gcmd.get("REG", minval=29, maxval=57, parser=lambda x: int(x, 0))
val = self.chip.read_reg(reg)
gcmd.respond_info("ADXL345 REG[0x%x] = 0x%x" % (reg, val))
cmd_ADXL345_DEBUG_WRITE_help = "Set accelerometer register (for debugging)"
def cmd_ADXL345_DEBUG_WRITE(self, gcmd):
if self.chip.is_measuring():
raise gcmd.error("adxl345 measurements in progress")
reg = gcmd.get("REG", minval=29, maxval=57, parser=lambda x: int(x, 0))
val = gcmd.get("VAL", minval=0, maxval=255, parser=lambda x: int(x, 0))
self.chip.set_reg(reg, val)
# Helper class for chip clock synchronization via linear regression
class ClockSyncRegression:
def __init__(self, mcu, chip_clock_smooth, decay = 1. / 20.):
self.mcu = mcu
self.chip_clock_smooth = chip_clock_smooth
self.decay = decay
self.last_chip_clock = self.last_exp_mcu_clock = 0.
self.mcu_clock_avg = self.mcu_clock_variance = 0.
self.chip_clock_avg = self.chip_clock_covariance = 0.
def reset(self, mcu_clock, chip_clock):
self.mcu_clock_avg = self.last_mcu_clock = mcu_clock
self.chip_clock_avg = chip_clock
self.mcu_clock_variance = self.chip_clock_covariance = 0.
self.last_chip_clock = self.last_exp_mcu_clock = 0.
def update(self, mcu_clock, chip_clock):
# Update linear regression
decay = self.decay
diff_mcu_clock = mcu_clock - self.mcu_clock_avg
self.mcu_clock_avg += decay * diff_mcu_clock
self.mcu_clock_variance = (1. - decay) * (
self.mcu_clock_variance + diff_mcu_clock**2 * decay)
diff_chip_clock = chip_clock - self.chip_clock_avg
self.chip_clock_avg += decay * diff_chip_clock
self.chip_clock_covariance = (1. - decay) * (
self.chip_clock_covariance + diff_mcu_clock*diff_chip_clock*decay)
def set_last_chip_clock(self, chip_clock):
base_mcu, base_chip, inv_cfreq = self.get_clock_translation()
self.last_chip_clock = chip_clock
self.last_exp_mcu_clock = base_mcu + (chip_clock-base_chip) * inv_cfreq
def get_clock_translation(self):
inv_chip_freq = self.mcu_clock_variance / self.chip_clock_covariance
if not self.last_chip_clock:
return self.mcu_clock_avg, self.chip_clock_avg, inv_chip_freq
# Find mcu clock associated with future chip_clock
s_chip_clock = self.last_chip_clock + self.chip_clock_smooth
scdiff = s_chip_clock - self.chip_clock_avg
s_mcu_clock = self.mcu_clock_avg + scdiff * inv_chip_freq
# Calculate frequency to converge at future point
mdiff = s_mcu_clock - self.last_exp_mcu_clock
s_inv_chip_freq = mdiff / self.chip_clock_smooth
return self.last_exp_mcu_clock, self.last_chip_clock, s_inv_chip_freq
def get_time_translation(self):
base_mcu, base_chip, inv_cfreq = self.get_clock_translation()
clock_to_print_time = self.mcu.clock_to_print_time
base_time = clock_to_print_time(base_mcu)
inv_freq = clock_to_print_time(base_mcu + inv_cfreq) - base_time
return base_time, base_chip, inv_freq
MIN_MSG_TIME = 0.100
# Printer class that controls ADXL345 chip
class ADXL345:
def __init__(self, config):
self.printer = config.get_printer()
ADXLCommandHelper(config, self)
self.query_rate = 0
am = {'x': (0, SCALE), 'y': (1, SCALE), 'z': (2, SCALE),
'-x': (0, -SCALE), '-y': (1, -SCALE), '-z': (2, -SCALE)}
axes_map = config.getlist('axes_map', ('x','y','z'), count=3)
if any([a not in am for a in axes_map]):
raise config.error("Invalid adxl345 axes_map parameter")
self.axes_map = [am[a.strip()] for a in axes_map]
self.data_rate = config.getint('rate', 3200)
if self.data_rate not in QUERY_RATES:
raise config.error("Invalid rate parameter: %d" % (self.data_rate,))
# Measurement storage (accessed from background thread)
self.lock = threading.Lock()
self.raw_samples = []
# Setup mcu sensor_adxl345 bulk query code
self.spi = bus.MCU_SPI_from_config(config, 3, default_speed=5000000)
self.mcu = mcu = self.spi.get_mcu()
self.oid = oid = mcu.create_oid()
self.query_adxl345_cmd = self.query_adxl345_end_cmd = None
self.query_adxl345_status_cmd = None
mcu.add_config_cmd("config_adxl345 oid=%d spi_oid=%d"
% (oid, self.spi.get_oid()))
mcu.add_config_cmd("query_adxl345 oid=%d clock=0 rest_ticks=0"
% (oid,), on_restart=True)
mcu.register_config_callback(self._build_config)
mcu.register_response(self._handle_adxl345_data, "adxl345_data", oid)
# Clock tracking
self.last_sequence = self.last_limit_count = self.max_query_duration = 0
self.clock_sync = ClockSyncRegression(self.mcu, 640)
# API server endpoints
self.api_dump = motion_report.APIDumpHelper(
self.printer, self._api_update, self._api_startstop, 0.100)
self.name = "default"
if len(config.get_name().split()) > 1:
self.name = config.get_name().split()[1]
wh = self.printer.lookup_object('webhooks')
wh.register_mux_endpoint("adxl345/dump_adxl345", "sensor", self.name,
self._handle_dump_adxl345)
def _build_config(self):
cmdqueue = self.spi.get_command_queue()
self.query_adxl345_cmd = self.mcu.lookup_command(
"query_adxl345 oid=%c clock=%u rest_ticks=%u", cq=cmdqueue)
self.query_adxl345_end_cmd = self.mcu.lookup_query_command(
"query_adxl345 oid=%c clock=%u rest_ticks=%u",
"adxl345_status oid=%c clock=%u query_ticks=%u next_sequence=%hu"
" buffered=%c fifo=%c limit_count=%hu", oid=self.oid, cq=cmdqueue)
self.query_adxl345_status_cmd = self.mcu.lookup_query_command(
"query_adxl345_status oid=%c",
"adxl345_status oid=%c clock=%u query_ticks=%u next_sequence=%hu"
" buffered=%c fifo=%c limit_count=%hu", oid=self.oid, cq=cmdqueue)
def read_reg(self, reg):
params = self.spi.spi_transfer([reg | REG_MOD_READ, 0x00])
response = bytearray(params['response'])
return response[1]
def set_reg(self, reg, val, minclock=0):
self.spi.spi_send([reg, val & 0xFF], minclock=minclock)
stored_val = self.read_reg(reg)
if stored_val != val:
raise self.printer.command_error(
"Failed to set ADXL345 register [0x%x] to 0x%x: got 0x%x. "
"This is generally indicative of connection problems "
"(e.g. faulty wiring) or a faulty adxl345 chip." % (
reg, val, stored_val))
# Measurement collection
def is_measuring(self):
return self.query_rate > 0
def _handle_adxl345_data(self, params):
with self.lock:
self.raw_samples.append(params)
def _extract_samples(self, raw_samples):
# Load variables to optimize inner loop below
(x_pos, x_scale), (y_pos, y_scale), (z_pos, z_scale) = self.axes_map
last_sequence = self.last_sequence
time_base, chip_base, inv_freq = self.clock_sync.get_time_translation()
# Process every message in raw_samples
count = seq = 0
samples = [None] * (len(raw_samples) * 8)
for params in raw_samples:
seq_diff = (last_sequence - params['sequence']) & 0xffff
seq_diff -= (seq_diff & 0x8000) << 1
seq = last_sequence - seq_diff
d = bytearray(params['data'])
len_d = len(d)
sdata = [(d[i] | ((d[i+1] & 0x1f) << 8)) - ((d[i+1] & 0x10) << 9)
for i in range(0, len_d-1, 2)]
msg_cdiff = seq * 8 - chip_base
for i in range(len_d // 6):
x = round(sdata[i*3 + x_pos] * x_scale, 6)
y = round(sdata[i*3 + y_pos] * y_scale, 6)
z = round(sdata[i*3 + z_pos] * z_scale, 6)
ptime = round(time_base + (msg_cdiff + i) * inv_freq, 6)
samples[count] = (ptime, x, y, z)
count += 1
self.clock_sync.set_last_chip_clock(seq * 8 + i)
del samples[count:]
return samples
def _update_clock(self, minclock=0):
# Query current state
for retry in range(5):
params = self.query_adxl345_status_cmd.send([self.oid],
minclock=minclock)
fifo = params['fifo'] & 0x7f
if fifo <= 32:
break
else:
raise self.printer.command_error("Unable to query adxl345 fifo")
mcu_clock = self.mcu.clock32_to_clock64(params['clock'])
sequence = (self.last_sequence & ~0xffff) | params['next_sequence']
if sequence < self.last_sequence:
sequence += 0x10000
self.last_sequence = sequence
buffered = params['buffered']
limit_count = (self.last_limit_count & ~0xffff) | params['limit_count']
if limit_count < self.last_limit_count:
limit_count += 0x10000
self.last_limit_count = limit_count
duration = params['query_ticks']
if duration > self.max_query_duration:
# Skip measurement as a high query time could skew clock tracking
self.max_query_duration = max(2 * self.max_query_duration,
self.mcu.seconds_to_clock(.000005))
return
self.max_query_duration = 2 * duration
msg_count = sequence * 8 + buffered // 6 + fifo
# The "chip clock" is the message counter plus .5 for average
# inaccuracy of query responses and plus .5 for assumed offset
# of adxl345 hw processing time.
chip_clock = msg_count + 1
self.clock_sync.update(mcu_clock + duration // 2, chip_clock)
def _start_measurements(self):
if self.is_measuring():
return
# In case of miswiring, testing ADXL345 device ID prevents treating
# noise or wrong signal as a correctly initialized device
dev_id = self.read_reg(REG_DEVID)
if dev_id != ADXL345_DEV_ID:
raise self.printer.command_error("Invalid adxl345 id (got %x vs %x)"
% (dev_id, ADXL345_DEV_ID))
# Setup chip in requested query rate
self.set_reg(REG_POWER_CTL, 0x00)
self.set_reg(REG_DATA_FORMAT, 0x0B)
self.set_reg(REG_FIFO_CTL, 0x00)
self.set_reg(REG_BW_RATE, QUERY_RATES[self.data_rate])
self.set_reg(REG_FIFO_CTL, 0x80)
# Setup samples
with self.lock:
self.raw_samples = []
# Start bulk reading
systime = self.printer.get_reactor().monotonic()
print_time = self.mcu.estimated_print_time(systime) + MIN_MSG_TIME
reqclock = self.mcu.print_time_to_clock(print_time)
rest_ticks = self.mcu.seconds_to_clock(4. / self.data_rate)
self.query_rate = self.data_rate
self.query_adxl345_cmd.send([self.oid, reqclock, rest_ticks],
reqclock=reqclock)
logging.info("ADXL345 starting '%s' measurements", self.name)
# Initialize clock tracking
self.last_sequence = 0
self.last_limit_count = 0
self.clock_sync.reset(reqclock, 0)
self.max_query_duration = 1 << 31
self._update_clock(minclock=reqclock)
self.max_query_duration = 1 << 31
def _finish_measurements(self):
if not self.is_measuring():
return
# Halt bulk reading
params = self.query_adxl345_end_cmd.send([self.oid, 0, 0])
self.query_rate = 0
with self.lock:
self.raw_samples = []
logging.info("ADXL345 finished '%s' measurements", self.name)
# API interface
def _api_update(self, eventtime):
self._update_clock()
with self.lock:
raw_samples = self.raw_samples
self.raw_samples = []
if not raw_samples:
return {}
samples = self._extract_samples(raw_samples)
if not samples:
return {}
return {'data': samples, 'overflows': self.last_limit_count}
def _api_startstop(self, is_start):
if is_start:
self._start_measurements()
else:
self._finish_measurements()
def _handle_dump_adxl345(self, web_request):
self.api_dump.add_client(web_request)
hdr = ('time', 'x_acceleration', 'y_acceleration', 'z_acceleration')
web_request.send({'header': hdr})
def start_internal_client(self):
if self.is_measuring():
raise self.printer.command_error(
"ADXL345 measurement already in progress")
cconn = self.api_dump.add_internal_client()
return ADXL345QueryHelper(self.printer, cconn)
def load_config(config):
return ADXL345(config)
def load_config_prefix(config):
return ADXL345(config)