diff --git a/scripts/flash_can.py b/scripts/flash_can.py index b787709..1dcd789 100755 --- a/scripts/flash_can.py +++ b/scripts/flash_can.py @@ -5,16 +5,20 @@ # # This file may be distributed under the terms of the GNU GPLv3 license. from __future__ import annotations +import re import sys import os import asyncio import socket import struct +import fcntl +import termios import logging import errno import argparse import hashlib import pathlib +import shlex from typing import Dict, List, Optional, Union def output_line(msg: str) -> None: @@ -496,6 +500,29 @@ class CanSocket: self._loop.remove_reader(self.cansock.fileno()) self.cansock.close() +########################################################## +# +# USB Bootloader Entry Helpers: +# Sourced from flash_usb.py in Klipper +# +# Copyright (C) 2019 Kevin O'Connor +# +########################################################## + +def enter_usb_bootloader(device): + try: + f = open(device, 'rb') + fd = f.fileno() + fcntl.ioctl(fd, termios.TIOCMBIS, struct.pack('I', termios.TIOCM_DTR)) + t = termios.tcgetattr(fd) + t[4] = t[5] = termios.B1200 + output_line("Entering bootloader on %s\n" % (device,)) + termios.tcsetattr(fd, termios.TCSANOW, t) + fcntl.ioctl(fd, termios.TIOCMBIC, struct.pack('I', termios.TIOCM_DTR)) + f.close() + except (IOError, OSError) as e: + pass + class SerialSocket: def __init__(self, loop: asyncio.AbstractEventLoop): self._loop = loop @@ -510,6 +537,52 @@ class SerialSocket: self.close() self.node.feed_data(data) + async def _wait_bootloader(self, device_path: pathlib.Path) -> bool: + maxsleep = 10. + total_sleep = 0. + output("Entering USB Bootloader...") + while total_sleep < maxsleep: + await asyncio.sleep(.5) + output(".") + total_sleep += .5 + if device_path.exists(): + output_line("found") + return True + output_line("Failed to detect usb device") + return False + + async def _is_klipper_usb(self, ttyname: str) -> bool: + udev_cmd = shlex.split(f"udevadm info -n {ttyname}") + try: + proc = await asyncio.subprocess.create_subprocess_exec( + *udev_cmd, stdout=asyncio.subprocess.PIPE + ) + stdout, stderr = await proc.communicate() + ret = await proc.wait() + if ret != 0: + return False + data = stdout.decode() + logging.info(f"udevadm result:\n{data}") + match = re.search(r"ID_VENDOR=([0-9a-zA-Z]+)", data) + if match is not None and match.group(1).lower() == "klipper": + return True + except Exception: + logging.exception("udevadm error") + return False + + async def _find_usb_path(self, device: str) -> Optional[pathlib.Path]: + ttypath = pathlib.Path(device).resolve() + usb_path = pathlib.Path("/dev/serial/by-path") + if not usb_path.exists(): + return None + for devpath in usb_path.iterdir(): + if ( + devpath.samefile(ttypath) and + await self._is_klipper_usb(ttypath) + ): + return devpath + return None + def send(self, can_id: int, payload: bytes = b"") -> None: try: self.serial.write(payload) @@ -521,6 +594,12 @@ class SerialSocket: if not fw_path.is_file(): raise FlashCanError("Invalid firmware path '%s'" % (fw_path)) import serial + dev_path = await self._find_usb_path(intf) + if dev_path is not None: + # Klipper usb device detected, attempt to enter bootloader + intf = str(dev_path) + enter_usb_bootloader(dev_path) + await self._wait_bootloader(dev_path) self.serial_error = serial.SerialException try: serial_dev = serial.Serial(baudrate=baud, timeout=0,