klipper/klippy/extras/motion_queuing.py
Kevin O'Connor 864c78f24a motion_queueing: Add flush_steppersync()
Move the mcu.flush_moves() code to motion_queuing.flush_steppersync().

Signed-off-by: Kevin O'Connor <kevin@koconnor.net>
2025-08-11 19:43:35 -04:00

77 lines
3.1 KiB
Python

# Helper code for low-level motion queuing and flushing
#
# Copyright (C) 2025 Kevin O'Connor <kevin@koconnor.net>
#
# This file may be distributed under the terms of the GNU GPLv3 license.
import logging
import chelper
class PrinterMotionQueuing:
def __init__(self, config):
self.printer = config.get_printer()
self.steppers = []
self.trapqs = []
self.stepcompress = []
self.steppersyncs = []
self.flush_callbacks = []
ffi_main, ffi_lib = chelper.get_ffi()
self.trapq_finalize_moves = ffi_lib.trapq_finalize_moves
self.steppersync_flush = ffi_lib.steppersync_flush
def allocate_trapq(self):
ffi_main, ffi_lib = chelper.get_ffi()
trapq = ffi_main.gc(ffi_lib.trapq_alloc(), ffi_lib.trapq_free)
self.trapqs.append(trapq)
return trapq
def allocate_stepcompress(self, mcu, oid):
ffi_main, ffi_lib = chelper.get_ffi()
sc = ffi_main.gc(ffi_lib.stepcompress_alloc(oid),
ffi_lib.stepcompress_free)
self.stepcompress.append((mcu, sc))
return sc
def allocate_steppersync(self, mcu, serialqueue, move_count):
stepqueues = []
for sc_mcu, sc in self.stepcompress:
if sc_mcu is mcu:
stepqueues.append(sc)
ffi_main, ffi_lib = chelper.get_ffi()
ss = ffi_main.gc(
ffi_lib.steppersync_alloc(serialqueue, stepqueues, len(stepqueues),
move_count),
ffi_lib.steppersync_free)
self.steppersyncs.append((mcu, ss))
return ss
def register_stepper(self, config, stepper):
self.steppers.append(stepper)
def register_flush_callback(self, callback):
self.flush_callbacks.append(callback)
def flush_motion_queues(self, must_flush_time, max_step_gen_time):
for cb in self.flush_callbacks:
cb(must_flush_time)
for stepper in self.steppers:
stepper.generate_steps(max_step_gen_time)
def clean_motion_queues(self, trapq_free_time, clear_history_time):
for trapq in self.trapqs:
self.trapq_finalize_moves(trapq, trapq_free_time,
clear_history_time)
def flush_steppersync(self, print_time, clear_history_time):
for mcu, ss in self.steppersyncs:
clock = mcu.print_time_to_clock(print_time)
if clock < 0:
continue
clear_history_clock = \
max(0, mcu.print_time_to_clock(clear_history_time))
ret = self.steppersync_flush(ss, clock, clear_history_clock)
if ret:
raise mcu.error("Internal error in MCU '%s' stepcompress"
% (mcu.get_name(),))
def wipe_trapq(self, trapq):
# Expire any remaining movement in the trapq (force to history list)
NEVER = 9999999999999999.
self.trapq_finalize_moves(trapq, NEVER, 0.)
def lookup_trapq_append(self):
ffi_main, ffi_lib = chelper.get_ffi()
return ffi_lib.trapq_append
def load_config(config):
return PrinterMotionQueuing(config)