mirror of
https://github.com/andreili/katapult.git
synced 2025-08-23 19:34:06 +02:00
flash_can: support entering the usb bootloader
Signed-off-by: Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
parent
8702008265
commit
b267acc6b3
@ -5,16 +5,20 @@
|
||||
#
|
||||
# This file may be distributed under the terms of the GNU GPLv3 license.
|
||||
from __future__ import annotations
|
||||
import re
|
||||
import sys
|
||||
import os
|
||||
import asyncio
|
||||
import socket
|
||||
import struct
|
||||
import fcntl
|
||||
import termios
|
||||
import logging
|
||||
import errno
|
||||
import argparse
|
||||
import hashlib
|
||||
import pathlib
|
||||
import shlex
|
||||
from typing import Dict, List, Optional, Union
|
||||
|
||||
def output_line(msg: str) -> None:
|
||||
@ -496,6 +500,29 @@ class CanSocket:
|
||||
self._loop.remove_reader(self.cansock.fileno())
|
||||
self.cansock.close()
|
||||
|
||||
##########################################################
|
||||
#
|
||||
# USB Bootloader Entry Helpers:
|
||||
# Sourced from flash_usb.py in Klipper
|
||||
#
|
||||
# Copyright (C) 2019 Kevin O'Connor <kevin@koconnor.net>
|
||||
#
|
||||
##########################################################
|
||||
|
||||
def enter_usb_bootloader(device):
|
||||
try:
|
||||
f = open(device, 'rb')
|
||||
fd = f.fileno()
|
||||
fcntl.ioctl(fd, termios.TIOCMBIS, struct.pack('I', termios.TIOCM_DTR))
|
||||
t = termios.tcgetattr(fd)
|
||||
t[4] = t[5] = termios.B1200
|
||||
output_line("Entering bootloader on %s\n" % (device,))
|
||||
termios.tcsetattr(fd, termios.TCSANOW, t)
|
||||
fcntl.ioctl(fd, termios.TIOCMBIC, struct.pack('I', termios.TIOCM_DTR))
|
||||
f.close()
|
||||
except (IOError, OSError) as e:
|
||||
pass
|
||||
|
||||
class SerialSocket:
|
||||
def __init__(self, loop: asyncio.AbstractEventLoop):
|
||||
self._loop = loop
|
||||
@ -510,6 +537,52 @@ class SerialSocket:
|
||||
self.close()
|
||||
self.node.feed_data(data)
|
||||
|
||||
async def _wait_bootloader(self, device_path: pathlib.Path) -> bool:
|
||||
maxsleep = 10.
|
||||
total_sleep = 0.
|
||||
output("Entering USB Bootloader...")
|
||||
while total_sleep < maxsleep:
|
||||
await asyncio.sleep(.5)
|
||||
output(".")
|
||||
total_sleep += .5
|
||||
if device_path.exists():
|
||||
output_line("found")
|
||||
return True
|
||||
output_line("Failed to detect usb device")
|
||||
return False
|
||||
|
||||
async def _is_klipper_usb(self, ttyname: str) -> bool:
|
||||
udev_cmd = shlex.split(f"udevadm info -n {ttyname}")
|
||||
try:
|
||||
proc = await asyncio.subprocess.create_subprocess_exec(
|
||||
*udev_cmd, stdout=asyncio.subprocess.PIPE
|
||||
)
|
||||
stdout, stderr = await proc.communicate()
|
||||
ret = await proc.wait()
|
||||
if ret != 0:
|
||||
return False
|
||||
data = stdout.decode()
|
||||
logging.info(f"udevadm result:\n{data}")
|
||||
match = re.search(r"ID_VENDOR=([0-9a-zA-Z]+)", data)
|
||||
if match is not None and match.group(1).lower() == "klipper":
|
||||
return True
|
||||
except Exception:
|
||||
logging.exception("udevadm error")
|
||||
return False
|
||||
|
||||
async def _find_usb_path(self, device: str) -> Optional[pathlib.Path]:
|
||||
ttypath = pathlib.Path(device).resolve()
|
||||
usb_path = pathlib.Path("/dev/serial/by-path")
|
||||
if not usb_path.exists():
|
||||
return None
|
||||
for devpath in usb_path.iterdir():
|
||||
if (
|
||||
devpath.samefile(ttypath) and
|
||||
await self._is_klipper_usb(ttypath)
|
||||
):
|
||||
return devpath
|
||||
return None
|
||||
|
||||
def send(self, can_id: int, payload: bytes = b"") -> None:
|
||||
try:
|
||||
self.serial.write(payload)
|
||||
@ -521,6 +594,12 @@ class SerialSocket:
|
||||
if not fw_path.is_file():
|
||||
raise FlashCanError("Invalid firmware path '%s'" % (fw_path))
|
||||
import serial
|
||||
dev_path = await self._find_usb_path(intf)
|
||||
if dev_path is not None:
|
||||
# Klipper usb device detected, attempt to enter bootloader
|
||||
intf = str(dev_path)
|
||||
enter_usb_bootloader(dev_path)
|
||||
await self._wait_bootloader(dev_path)
|
||||
self.serial_error = serial.SerialException
|
||||
try:
|
||||
serial_dev = serial.Serial(baudrate=baud, timeout=0,
|
||||
|
Loading…
x
Reference in New Issue
Block a user