From 730fde48ab86ecb114ed1d6d930b53ba20e251bf Mon Sep 17 00:00:00 2001 From: Eric Callahan Date: Tue, 20 Aug 2024 06:33:17 -0400 Subject: [PATCH] flashtool: use recommended asyncio entry point Signed-off-by: Eric Callahan --- scripts/flashtool.py | 117 ++++++++++++++++++++++--------------------- 1 file changed, 60 insertions(+), 57 deletions(-) diff --git a/scripts/flashtool.py b/scripts/flashtool.py index f8718f4..fdfe571 100755 --- a/scripts/flashtool.py +++ b/scripts/flashtool.py @@ -17,6 +17,7 @@ import hashlib import pathlib import shutil import shlex +import contextlib from typing import Dict, List, Optional, Union HAS_SERIAL = True try: @@ -586,9 +587,12 @@ class SerialSocket: pid = fd_dir.parent.name if not pid.isdigit(): continue - try: + with contextlib.suppress(OSError): for item in fd_dir.iterdir(): - item_st = item.stat() + try: + item_st = item.stat() + except OSError: + continue item_id = (item_st.st_dev, item_st.st_ino) if item_id == dev_id: proc_name = await self._lookup_proc_name(pid) @@ -598,8 +602,6 @@ class SerialSocket: f"Process {proc_name}" ) raise FlashError(f"Serial device {dev_path} in use") - except PermissionError: - continue async def run(self, intf: str, baud: int, fw_path: pathlib.Path) -> None: if not fw_path.is_file(): @@ -633,7 +635,59 @@ class SerialSocket: self.serial.close() self.serial = None -def main(): +async def main(args: argparse.Namespace) -> int: + if not args.verbose: + logging.getLogger().setLevel(logging.ERROR) + intf = args.interface + fpath = pathlib.Path(args.firmware).expanduser().resolve() + loop = asyncio.get_running_loop() + iscan = args.device is None + req_only = args.request_bootloader + sock: CanSocket | SerialSocket | None = None + try: + if iscan: + sock = CanSocket(loop) + if args.query: + await sock.run_query(intf) + else: + if args.uuid is None: + raise FlashError( + "The 'uuid' option must be specified to flash a device" + ) + output_line(f"Flashing CAN UUID {args.uuid} on interface {intf}") + uuid = int(args.uuid, 16) + await sock.run(intf, uuid, fpath, req_only) + else: + if not HAS_SERIAL: + ser_inst_cmd = "pip3 install serial" + if shutil.which("apt") is not None: + ser_inst_cmd = "sudo apt install python3-serial" + raise FlashError( + "The pyserial python package was not found. To install " + "run the following command in a terminal: \n\n" + f" {ser_inst_cmd}\n\n" + ) + if args.device is None: + raise FlashError( + "The 'device' option must be specified to flash a device" + ) + output_line(f"Flashing Serial Device {args.device}, baud {args.baud}") + sock = SerialSocket(loop) + await sock.run(args.device, args.baud, fpath) + except Exception: + logging.exception("Flash Error") + return -1 + finally: + if sock is not None: + sock.close() + if args.query: + output_line("Query Complete") + else: + output_line("Flash Success") + return 0 + + +if __name__ == '__main__': parser = argparse.ArgumentParser( description="Katapult Flash Tool") parser.add_argument( @@ -670,55 +724,4 @@ def main(): ) args = parser.parse_args() - if not args.verbose: - logging.getLogger().setLevel(logging.ERROR) - intf = args.interface - fpath = pathlib.Path(args.firmware).expanduser().resolve() - loop = asyncio.get_event_loop() - iscan = args.device is None - req_only = args.request_bootloader - sock = None - try: - if iscan: - sock = CanSocket(loop) - if args.query: - loop.run_until_complete(sock.run_query(intf)) - else: - if args.uuid is None: - raise FlashError( - "The 'uuid' option must be specified to flash a device" - ) - output_line(f"Flashing CAN UUID {args.uuid} on interface {intf}") - uuid = int(args.uuid, 16) - loop.run_until_complete(sock.run(intf, uuid, fpath, req_only)) - else: - if not HAS_SERIAL: - ser_inst_cmd = "pip3 install serial" - if shutil.which("apt") is not None: - ser_inst_cmd = "sudo apt install python3-serial" - raise FlashError( - "The pyserial python package was not found. To install " - "run the following command in a terminal: \n\n" - f" {ser_inst_cmd}\n\n" - ) - if args.device is None: - raise FlashError( - "The 'device' option must be specified to flash a device" - ) - output_line(f"Flashing Serial Device {args.device}, baud {args.baud}") - sock = SerialSocket(loop) - loop.run_until_complete(sock.run(args.device, args.baud, fpath)) - except Exception: - logging.exception("Flash Error") - sys.exit(-1) - finally: - if sock is not None: - sock.close() - if args.query: - output_line("Query Complete") - else: - output_line("Flash Success") - - -if __name__ == '__main__': - main() + exit(asyncio.run(main(args)))