flashtool: support USB-CAN bridge detection

This enables single step flashing for Klipper devices
configured as a USB-CAN bridge.

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Eric Callahan 2025-03-04 20:16:41 -05:00
parent 40b22bdf6e
commit b22330e917

View File

@ -86,6 +86,7 @@ CANBUS_NODEID_OFFSET = 128
# USB IDs
KATAPULT_USB_ID = "1d50:6177"
KLIPPER_USB_ID = "1d50:614e"
GS_CAN_USB_ID = "1d50:606f"
SERIAL_BL_REQ = b"~ \x1c Request Serial Bootloader!! ~"
class FlashError(Exception):
@ -94,19 +95,24 @@ class FlashError(Exception):
def get_usb_info(usb_path: pathlib.Path) -> Dict[str, Any]:
usb_info: Dict[str, Any] = {}
id_path = usb_path.joinpath("idVendor")
prod_path = usb_path.joinpath("idProduct")
prod_id_path = usb_path.joinpath("idProduct")
mfr_path = usb_path.joinpath("manufacturer")
prod_path = usb_path.joinpath("product")
if id_path.is_file() and prod_path.is_file():
serial_path = usb_path.joinpath("serial")
usb_info["usb_id"] = ""
if id_path.is_file() and prod_id_path.is_file():
vid = id_path.read_text().strip().lower()
pid = prod_path.read_text().strip().lower()
pid = prod_id_path.read_text().strip().lower()
usb_info["usb_id"] = f"{vid}:{pid}"
usb_info["manufacturer"] = "unknown"
usb_info["product"] = "unknown"
usb_info["serial_number"] = ""
if mfr_path.is_file():
usb_info["manufacturer"] = mfr_path.read_text().strip().lower()
if prod_path.is_file():
usb_info["product"] = prod_path.read_text().strip().lower()
if serial_path.is_file():
usb_info["serial_number"] = serial_path.read_text().strip().lower()
return usb_info
def get_usb_path(device: pathlib.Path) -> Optional[pathlib.Path]:
@ -136,6 +142,72 @@ def get_stable_usb_symlink(device: pathlib.Path) -> pathlib.Path:
pass
return device_path
# Python Port of fasthash6
# Host URL: http://github.com/ztanml/fast-hash
# Original Copyright:
# Copyright (C) 2012 Zilong Tan (eric.zltan@gmail.com)
#
# MIT License
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation
# files (the "Software"), to deal in the Software without
# restriction, including without limitation the rights to use, copy,
# modify, merge, publish, distribute, sublicense, and/or sell copies
# of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
MASK_64 = 0xFFFFFFFFFFFFFFFF
def mix(val: int) -> int:
val ^= val >> 23
val = (val * 0x2127599bf4325c37) & MASK_64
val ^= val >> 47
return val
def fasthash64(buffer: bytes, seed: int) -> int:
m = 0x880355f21e6d1965
h = seed ^ ((len(buffer) * m) & MASK_64)
long_count = len(buffer) // 8
remaining: bytes = buffer
if long_count:
byte_count = long_count * 8
buf = struct.unpack(f"<{long_count}Q", buffer[:byte_count])
for v in buf:
h ^= mix(v)
h = (h * m) & MASK_64
remaining = buffer[byte_count:]
if remaining:
v = 0
print(remaining)
for i in range(len(remaining), 0, -1):
v ^= remaining[i -1] << (8 * (i - 1))
h ^= mix(v)
h = (h * m) & MASK_64
return mix(h)
def convert_usbsn_to_uuid(serial_number: str) -> int:
seed = 0xA16231A7
chip_id = bytes.fromhex(serial_number)
result = fasthash64(chip_id, seed)
swapped = 0
# The UUID returned by Klipper firmware is byteswapped
for i in range(6):
swapped |= ((result >> (i * 8)) & 0xFF) << ((5 - i) * 8)
return swapped
class CanFlasher:
def __init__(
self,
@ -475,6 +547,14 @@ class BaseSocket:
def is_query(self) -> bool:
return self._args.query
@property
def is_usb_can_bridge(self) -> bool:
return False
@property
def usb_serial_path(self) -> pathlib.Path:
raise NotImplementedError()
def _check_firmware(self) -> None:
if self.is_flash_req and not self._fw_path.is_file():
raise FlashError("Invalid firmware path '%s'" % (self._fw_path))
@ -490,6 +570,8 @@ class CanSocket(BaseSocket):
super().__init__(args)
self._uuid = 0
self._can_interface = args.interface
self._can_bridge_path: pathlib.Path | None = None
self._can_bridge_serial_path: pathlib.Path | None = None
if not self.is_query:
if args.uuid is None:
raise FlashError(
@ -498,6 +580,7 @@ class CanSocket(BaseSocket):
else:
intf = self._can_interface
self._uuid = int(args.uuid, 16)
self._search_canbus_bridge()
output_line(f"Connecting to CAN UUID {args.uuid} on interface {intf}")
self.cansock = socket.socket(socket.PF_CAN, socket.SOCK_RAW,
socket.CAN_RAW)
@ -512,6 +595,26 @@ class CanSocket(BaseSocket):
self.output_busy = False
self.closed = True
@property
def is_usb_can_bridge(self) -> bool:
return self._can_bridge_path is not None
@property
def usb_serial_path(self) -> pathlib.Path:
if self._can_bridge_serial_path is not None:
return self._can_bridge_serial_path
# find tty path
assert self._can_bridge_path is not None
dir_name = self._can_bridge_path.name
ttys = list(self._can_bridge_path.glob(f"{dir_name}:*/tty/tty*"))
if not ttys:
raise FlashError("Failed to locate serial tty path")
tty_path = pathlib.Path("/dev").joinpath(ttys[0].name)
if not tty_path.exists():
raise FlashError("Detected tty path does not exist")
self._can_bridge_serial_path = tty_path
return tty_path
def _handle_can_response(self) -> None:
try:
data = self.cansock.recv(4096)
@ -630,6 +733,54 @@ class CanSocket(BaseSocket):
self.nodes[decoded_id + 1] = node
return node
def _search_canbus_bridge(self) -> None:
can_intf = self._can_interface.lower()
sysfs_usb_path = pathlib.Path("/sys/bus/usb/devices")
if not sysfs_usb_path.is_dir():
return
for item in sysfs_usb_path.iterdir():
if not item.joinpath("bDeviceClass").is_file():
continue
usb_info = get_usb_info(item)
if (
usb_info["usb_id"] != GS_CAN_USB_ID or
usb_info["manufacturer"] != "klipper"
):
continue
if not list(item.glob(f"{item.name}:*/net/{can_intf}")):
continue
# Klipper GS USB Device matches
serial_no = usb_info["serial_number"]
logging.info(
f"Found Klipper USB-CAN bridge on {can_intf}, serial {serial_no}"
)
if serial_no:
det_uuid = convert_usbsn_to_uuid(serial_no)
logging.info(
f"Detected UUID: {det_uuid:x}, provided UUID: {self._uuid:x}"
)
if det_uuid == self._uuid:
self._can_bridge_path = item
output_line(f"Canbus Bridge detected at {item}")
break
async def _wait_canbridge_reset(self) -> None:
if self._can_bridge_path is None:
return
output("Waiting for USB Reconnect.")
for _ in range(8):
await asyncio.sleep(.5)
output(".")
usb_info = get_usb_info(self._can_bridge_path)
mfr = usb_info.get("manufacturer")
if mfr == "katapult":
await asyncio.sleep(.5)
serial_path = self.usb_serial_path
output_line(f"Katapult detected at serial port {serial_path}")
break
else:
output_line("timed out")
async def run(self) -> None:
self._check_firmware()
try:
@ -642,6 +793,10 @@ class CanSocket(BaseSocket):
self.cansock.fileno(), self._handle_can_response)
if self.is_flash_req or self.is_bootloader_req:
self._jump_to_bootloader(self._uuid)
if self.is_usb_can_bridge:
await self._wait_canbridge_reset()
return
else:
await asyncio.sleep(1.0)
if self.is_bootloader_req:
return
@ -701,6 +856,10 @@ class SerialSocket(BaseSocket):
def is_query(self) -> bool:
return False
@property
def usb_serial_path(self) -> pathlib.Path:
return pathlib.Path(self._device)
def _handle_response(self) -> None:
assert self.serial is not None
try:
@ -899,6 +1058,11 @@ async def main(args: argparse.Namespace) -> int:
else:
sock = SerialSocket(args)
await sock.run()
if sock.is_usb_can_bridge and not sock.is_bootloader_req:
args.device = str(sock.usb_serial_path)
sock.close()
sock = SerialSocket(args)
await sock.run()
except Exception:
logging.exception("Flash Tool Error")
return 1