diff --git a/scripts/flashtool.py b/scripts/flashtool.py index 2f2a5b0..b5d9d93 100755 --- a/scripts/flashtool.py +++ b/scripts/flashtool.py @@ -86,6 +86,7 @@ CANBUS_NODEID_OFFSET = 128 # USB IDs KATAPULT_USB_ID = "1d50:6177" KLIPPER_USB_ID = "1d50:614e" +GS_CAN_USB_ID = "1d50:606f" SERIAL_BL_REQ = b"~ \x1c Request Serial Bootloader!! ~" class FlashError(Exception): @@ -94,19 +95,24 @@ class FlashError(Exception): def get_usb_info(usb_path: pathlib.Path) -> Dict[str, Any]: usb_info: Dict[str, Any] = {} id_path = usb_path.joinpath("idVendor") - prod_path = usb_path.joinpath("idProduct") + prod_id_path = usb_path.joinpath("idProduct") mfr_path = usb_path.joinpath("manufacturer") prod_path = usb_path.joinpath("product") - if id_path.is_file() and prod_path.is_file(): + serial_path = usb_path.joinpath("serial") + usb_info["usb_id"] = "" + if id_path.is_file() and prod_id_path.is_file(): vid = id_path.read_text().strip().lower() - pid = prod_path.read_text().strip().lower() + pid = prod_id_path.read_text().strip().lower() usb_info["usb_id"] = f"{vid}:{pid}" usb_info["manufacturer"] = "unknown" usb_info["product"] = "unknown" + usb_info["serial_number"] = "" if mfr_path.is_file(): usb_info["manufacturer"] = mfr_path.read_text().strip().lower() if prod_path.is_file(): usb_info["product"] = prod_path.read_text().strip().lower() + if serial_path.is_file(): + usb_info["serial_number"] = serial_path.read_text().strip().lower() return usb_info def get_usb_path(device: pathlib.Path) -> Optional[pathlib.Path]: @@ -136,6 +142,72 @@ def get_stable_usb_symlink(device: pathlib.Path) -> pathlib.Path: pass return device_path + +# Python Port of fasthash6 +# Host URL: http://github.com/ztanml/fast-hash +# Original Copyright: +# Copyright (C) 2012 Zilong Tan (eric.zltan@gmail.com) +# +# MIT License +# +# Permission is hereby granted, free of charge, to any person +# obtaining a copy of this software and associated documentation +# files (the "Software"), to deal in the Software without +# restriction, including without limitation the rights to use, copy, +# modify, merge, publish, distribute, sublicense, and/or sell copies +# of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS +# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN +# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +MASK_64 = 0xFFFFFFFFFFFFFFFF + +def mix(val: int) -> int: + val ^= val >> 23 + val = (val * 0x2127599bf4325c37) & MASK_64 + val ^= val >> 47 + return val + +def fasthash64(buffer: bytes, seed: int) -> int: + m = 0x880355f21e6d1965 + h = seed ^ ((len(buffer) * m) & MASK_64) + long_count = len(buffer) // 8 + remaining: bytes = buffer + if long_count: + byte_count = long_count * 8 + buf = struct.unpack(f"<{long_count}Q", buffer[:byte_count]) + for v in buf: + h ^= mix(v) + h = (h * m) & MASK_64 + remaining = buffer[byte_count:] + if remaining: + v = 0 + print(remaining) + for i in range(len(remaining), 0, -1): + v ^= remaining[i -1] << (8 * (i - 1)) + h ^= mix(v) + h = (h * m) & MASK_64 + return mix(h) + +def convert_usbsn_to_uuid(serial_number: str) -> int: + seed = 0xA16231A7 + chip_id = bytes.fromhex(serial_number) + result = fasthash64(chip_id, seed) + swapped = 0 + # The UUID returned by Klipper firmware is byteswapped + for i in range(6): + swapped |= ((result >> (i * 8)) & 0xFF) << ((5 - i) * 8) + return swapped + class CanFlasher: def __init__( self, @@ -475,6 +547,14 @@ class BaseSocket: def is_query(self) -> bool: return self._args.query + @property + def is_usb_can_bridge(self) -> bool: + return False + + @property + def usb_serial_path(self) -> pathlib.Path: + raise NotImplementedError() + def _check_firmware(self) -> None: if self.is_flash_req and not self._fw_path.is_file(): raise FlashError("Invalid firmware path '%s'" % (self._fw_path)) @@ -490,6 +570,8 @@ class CanSocket(BaseSocket): super().__init__(args) self._uuid = 0 self._can_interface = args.interface + self._can_bridge_path: pathlib.Path | None = None + self._can_bridge_serial_path: pathlib.Path | None = None if not self.is_query: if args.uuid is None: raise FlashError( @@ -498,6 +580,7 @@ class CanSocket(BaseSocket): else: intf = self._can_interface self._uuid = int(args.uuid, 16) + self._search_canbus_bridge() output_line(f"Connecting to CAN UUID {args.uuid} on interface {intf}") self.cansock = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) @@ -512,6 +595,26 @@ class CanSocket(BaseSocket): self.output_busy = False self.closed = True + @property + def is_usb_can_bridge(self) -> bool: + return self._can_bridge_path is not None + + @property + def usb_serial_path(self) -> pathlib.Path: + if self._can_bridge_serial_path is not None: + return self._can_bridge_serial_path + # find tty path + assert self._can_bridge_path is not None + dir_name = self._can_bridge_path.name + ttys = list(self._can_bridge_path.glob(f"{dir_name}:*/tty/tty*")) + if not ttys: + raise FlashError("Failed to locate serial tty path") + tty_path = pathlib.Path("/dev").joinpath(ttys[0].name) + if not tty_path.exists(): + raise FlashError("Detected tty path does not exist") + self._can_bridge_serial_path = tty_path + return tty_path + def _handle_can_response(self) -> None: try: data = self.cansock.recv(4096) @@ -630,6 +733,54 @@ class CanSocket(BaseSocket): self.nodes[decoded_id + 1] = node return node + def _search_canbus_bridge(self) -> None: + can_intf = self._can_interface.lower() + sysfs_usb_path = pathlib.Path("/sys/bus/usb/devices") + if not sysfs_usb_path.is_dir(): + return + for item in sysfs_usb_path.iterdir(): + if not item.joinpath("bDeviceClass").is_file(): + continue + usb_info = get_usb_info(item) + if ( + usb_info["usb_id"] != GS_CAN_USB_ID or + usb_info["manufacturer"] != "klipper" + ): + continue + if not list(item.glob(f"{item.name}:*/net/{can_intf}")): + continue + # Klipper GS USB Device matches + serial_no = usb_info["serial_number"] + logging.info( + f"Found Klipper USB-CAN bridge on {can_intf}, serial {serial_no}" + ) + if serial_no: + det_uuid = convert_usbsn_to_uuid(serial_no) + logging.info( + f"Detected UUID: {det_uuid:x}, provided UUID: {self._uuid:x}" + ) + if det_uuid == self._uuid: + self._can_bridge_path = item + output_line(f"Canbus Bridge detected at {item}") + break + + async def _wait_canbridge_reset(self) -> None: + if self._can_bridge_path is None: + return + output("Waiting for USB Reconnect.") + for _ in range(8): + await asyncio.sleep(.5) + output(".") + usb_info = get_usb_info(self._can_bridge_path) + mfr = usb_info.get("manufacturer") + if mfr == "katapult": + await asyncio.sleep(.5) + serial_path = self.usb_serial_path + output_line(f"Katapult detected at serial port {serial_path}") + break + else: + output_line("timed out") + async def run(self) -> None: self._check_firmware() try: @@ -642,7 +793,11 @@ class CanSocket(BaseSocket): self.cansock.fileno(), self._handle_can_response) if self.is_flash_req or self.is_bootloader_req: self._jump_to_bootloader(self._uuid) - await asyncio.sleep(1.0) + if self.is_usb_can_bridge: + await self._wait_canbridge_reset() + return + else: + await asyncio.sleep(1.0) if self.is_bootloader_req: return self._reset_nodes() @@ -701,6 +856,10 @@ class SerialSocket(BaseSocket): def is_query(self) -> bool: return False + @property + def usb_serial_path(self) -> pathlib.Path: + return pathlib.Path(self._device) + def _handle_response(self) -> None: assert self.serial is not None try: @@ -899,6 +1058,11 @@ async def main(args: argparse.Namespace) -> int: else: sock = SerialSocket(args) await sock.run() + if sock.is_usb_can_bridge and not sock.is_bootloader_req: + args.device = str(sock.usb_serial_path) + sock.close() + sock = SerialSocket(args) + await sock.run() except Exception: logging.exception("Flash Tool Error") return 1