Revert "flash_can: support entering the usb bootloader"

This reverts commit b267acc6b3fcadab07f9755f9a6f409c298870f7.
This commit is contained in:
Eric Callahan 2022-06-03 14:07:06 -04:00
parent b267acc6b3
commit 7574bcffa1
No known key found for this signature in database
GPG Key ID: 7027245FBBDDF59A

View File

@ -5,20 +5,16 @@
#
# This file may be distributed under the terms of the GNU GPLv3 license.
from __future__ import annotations
import re
import sys
import os
import asyncio
import socket
import struct
import fcntl
import termios
import logging
import errno
import argparse
import hashlib
import pathlib
import shlex
from typing import Dict, List, Optional, Union
def output_line(msg: str) -> None:
@ -500,29 +496,6 @@ class CanSocket:
self._loop.remove_reader(self.cansock.fileno())
self.cansock.close()
##########################################################
#
# USB Bootloader Entry Helpers:
# Sourced from flash_usb.py in Klipper
#
# Copyright (C) 2019 Kevin O'Connor <kevin@koconnor.net>
#
##########################################################
def enter_usb_bootloader(device):
try:
f = open(device, 'rb')
fd = f.fileno()
fcntl.ioctl(fd, termios.TIOCMBIS, struct.pack('I', termios.TIOCM_DTR))
t = termios.tcgetattr(fd)
t[4] = t[5] = termios.B1200
output_line("Entering bootloader on %s\n" % (device,))
termios.tcsetattr(fd, termios.TCSANOW, t)
fcntl.ioctl(fd, termios.TIOCMBIC, struct.pack('I', termios.TIOCM_DTR))
f.close()
except (IOError, OSError) as e:
pass
class SerialSocket:
def __init__(self, loop: asyncio.AbstractEventLoop):
self._loop = loop
@ -537,52 +510,6 @@ class SerialSocket:
self.close()
self.node.feed_data(data)
async def _wait_bootloader(self, device_path: pathlib.Path) -> bool:
maxsleep = 10.
total_sleep = 0.
output("Entering USB Bootloader...")
while total_sleep < maxsleep:
await asyncio.sleep(.5)
output(".")
total_sleep += .5
if device_path.exists():
output_line("found")
return True
output_line("Failed to detect usb device")
return False
async def _is_klipper_usb(self, ttyname: str) -> bool:
udev_cmd = shlex.split(f"udevadm info -n {ttyname}")
try:
proc = await asyncio.subprocess.create_subprocess_exec(
*udev_cmd, stdout=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate()
ret = await proc.wait()
if ret != 0:
return False
data = stdout.decode()
logging.info(f"udevadm result:\n{data}")
match = re.search(r"ID_VENDOR=([0-9a-zA-Z]+)", data)
if match is not None and match.group(1).lower() == "klipper":
return True
except Exception:
logging.exception("udevadm error")
return False
async def _find_usb_path(self, device: str) -> Optional[pathlib.Path]:
ttypath = pathlib.Path(device).resolve()
usb_path = pathlib.Path("/dev/serial/by-path")
if not usb_path.exists():
return None
for devpath in usb_path.iterdir():
if (
devpath.samefile(ttypath) and
await self._is_klipper_usb(ttypath)
):
return devpath
return None
def send(self, can_id: int, payload: bytes = b"") -> None:
try:
self.serial.write(payload)
@ -594,12 +521,6 @@ class SerialSocket:
if not fw_path.is_file():
raise FlashCanError("Invalid firmware path '%s'" % (fw_path))
import serial
dev_path = await self._find_usb_path(intf)
if dev_path is not None:
# Klipper usb device detected, attempt to enter bootloader
intf = str(dev_path)
enter_usb_bootloader(dev_path)
await self._wait_bootloader(dev_path)
self.serial_error = serial.SerialException
try:
serial_dev = serial.Serial(baudrate=baud, timeout=0,