mirror of
https://github.com/andreili/katapult.git
synced 2025-08-23 19:34:06 +02:00
flash_can: Add support for flashing over serial
Signed-off-by: Kevin O'Connor <kevin@koconnor.net>
This commit is contained in:
parent
ad566230c9
commit
2821c1c807
@ -496,9 +496,69 @@ class CanSocket:
|
||||
self._loop.remove_reader(self.cansock.fileno())
|
||||
self.cansock.close()
|
||||
|
||||
class SerialSocket:
|
||||
def __init__(self, loop: asyncio.AbstractEventLoop):
|
||||
self._loop = loop
|
||||
self.serial = None
|
||||
self.node = CanNode(0, self)
|
||||
|
||||
def _handle_response(self) -> None:
|
||||
try:
|
||||
data = self.serial.read(4096)
|
||||
except self.serial.SerialException as e:
|
||||
logging.exception("Error on serial read")
|
||||
self.close()
|
||||
self.node.feed_data(data)
|
||||
|
||||
def send(self, can_id: int, payload: bytes = b"") -> None:
|
||||
try:
|
||||
self.serial.write(payload)
|
||||
except self.serial.SerialException as e:
|
||||
logging.exception("Error on serial read")
|
||||
self.close()
|
||||
|
||||
async def run(self, intf: str, baud: int, fw_path: pathlib.Path) -> None:
|
||||
if not fw_path.is_file():
|
||||
raise FlashCanError("Invalid firmware path '%s'" % (fw_path))
|
||||
import serial
|
||||
try:
|
||||
serial_dev = serial.Serial(baudrate=baud, timeout=0,
|
||||
exclusive=True)
|
||||
serial_dev.port = intf
|
||||
serial_dev.open()
|
||||
except (OSError, IOError, serial.SerialException) as e:
|
||||
logging.warn("Unable to open serial port: %s", e)
|
||||
self.serial = serial_dev
|
||||
self._loop.add_reader(self.serial.fileno(), self._handle_response)
|
||||
flasher = CanFlasher(self.node, fw_path)
|
||||
try:
|
||||
await flasher.connect_btl()
|
||||
await flasher.send_file()
|
||||
await flasher.verify_file()
|
||||
finally:
|
||||
# always attempt to send the complete command. If
|
||||
# there is an error it will exit the bootloader
|
||||
# unless comms were broken
|
||||
await flasher.finish()
|
||||
|
||||
def close(self):
|
||||
if self.serial is None:
|
||||
return
|
||||
self._loop.remove_reader(self.serial.fileno())
|
||||
self.serial.close()
|
||||
self.serial = None
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Can Bootloader Flash Utility")
|
||||
parser.add_argument(
|
||||
"-d", "--device", metavar='<serial device>',
|
||||
help="Serial Device"
|
||||
)
|
||||
parser.add_argument(
|
||||
"-b", "--baud", default=250000, metavar='<baud rate>',
|
||||
help="Serial baud rate"
|
||||
)
|
||||
parser.add_argument(
|
||||
"-i", "--interface", default="can0", metavar='<can interface>',
|
||||
help="Can Interface"
|
||||
@ -526,23 +586,33 @@ def main():
|
||||
intf = args.interface
|
||||
fpath = pathlib.Path(args.firmware).expanduser().resolve()
|
||||
loop = asyncio.get_event_loop()
|
||||
iscan = args.device is None
|
||||
sock = None
|
||||
try:
|
||||
cansock = CanSocket(loop)
|
||||
if iscan:
|
||||
sock = CanSocket(loop)
|
||||
if args.query:
|
||||
loop.run_until_complete(cansock.run_query(intf))
|
||||
loop.run_until_complete(sock.run_query(intf))
|
||||
else:
|
||||
if args.uuid is None:
|
||||
raise FlashCanError(
|
||||
"The 'uuid' option must be specified to flash a device"
|
||||
)
|
||||
uuid = int(args.uuid, 16)
|
||||
loop.run_until_complete(cansock.run(intf, uuid, fpath))
|
||||
loop.run_until_complete(sock.run(intf, uuid, fpath))
|
||||
else:
|
||||
if args.device is None:
|
||||
raise FlashCanError(
|
||||
"The 'device' option must be specified to flash a device"
|
||||
)
|
||||
sock = SerialSocket(loop)
|
||||
loop.run_until_complete(sock.run(args.device, args.baud, fpath))
|
||||
except Exception as e:
|
||||
logging.exception("Can Flash Error")
|
||||
sys.exit(-1)
|
||||
finally:
|
||||
if cansock is not None:
|
||||
cansock.close()
|
||||
if sock is not None:
|
||||
sock.close()
|
||||
if args.query:
|
||||
output_line("Query Complete")
|
||||
else:
|
||||
|
Loading…
x
Reference in New Issue
Block a user