flashtool: use recommended asyncio entry point

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Eric Callahan 2024-08-20 06:33:17 -04:00
parent 7de954b916
commit 730fde48ab

View File

@ -17,6 +17,7 @@ import hashlib
import pathlib import pathlib
import shutil import shutil
import shlex import shlex
import contextlib
from typing import Dict, List, Optional, Union from typing import Dict, List, Optional, Union
HAS_SERIAL = True HAS_SERIAL = True
try: try:
@ -586,9 +587,12 @@ class SerialSocket:
pid = fd_dir.parent.name pid = fd_dir.parent.name
if not pid.isdigit(): if not pid.isdigit():
continue continue
try: with contextlib.suppress(OSError):
for item in fd_dir.iterdir(): for item in fd_dir.iterdir():
item_st = item.stat() try:
item_st = item.stat()
except OSError:
continue
item_id = (item_st.st_dev, item_st.st_ino) item_id = (item_st.st_dev, item_st.st_ino)
if item_id == dev_id: if item_id == dev_id:
proc_name = await self._lookup_proc_name(pid) proc_name = await self._lookup_proc_name(pid)
@ -598,8 +602,6 @@ class SerialSocket:
f"Process {proc_name}" f"Process {proc_name}"
) )
raise FlashError(f"Serial device {dev_path} in use") raise FlashError(f"Serial device {dev_path} in use")
except PermissionError:
continue
async def run(self, intf: str, baud: int, fw_path: pathlib.Path) -> None: async def run(self, intf: str, baud: int, fw_path: pathlib.Path) -> None:
if not fw_path.is_file(): if not fw_path.is_file():
@ -633,7 +635,59 @@ class SerialSocket:
self.serial.close() self.serial.close()
self.serial = None self.serial = None
def main(): async def main(args: argparse.Namespace) -> int:
if not args.verbose:
logging.getLogger().setLevel(logging.ERROR)
intf = args.interface
fpath = pathlib.Path(args.firmware).expanduser().resolve()
loop = asyncio.get_running_loop()
iscan = args.device is None
req_only = args.request_bootloader
sock: CanSocket | SerialSocket | None = None
try:
if iscan:
sock = CanSocket(loop)
if args.query:
await sock.run_query(intf)
else:
if args.uuid is None:
raise FlashError(
"The 'uuid' option must be specified to flash a device"
)
output_line(f"Flashing CAN UUID {args.uuid} on interface {intf}")
uuid = int(args.uuid, 16)
await sock.run(intf, uuid, fpath, req_only)
else:
if not HAS_SERIAL:
ser_inst_cmd = "pip3 install serial"
if shutil.which("apt") is not None:
ser_inst_cmd = "sudo apt install python3-serial"
raise FlashError(
"The pyserial python package was not found. To install "
"run the following command in a terminal: \n\n"
f" {ser_inst_cmd}\n\n"
)
if args.device is None:
raise FlashError(
"The 'device' option must be specified to flash a device"
)
output_line(f"Flashing Serial Device {args.device}, baud {args.baud}")
sock = SerialSocket(loop)
await sock.run(args.device, args.baud, fpath)
except Exception:
logging.exception("Flash Error")
return -1
finally:
if sock is not None:
sock.close()
if args.query:
output_line("Query Complete")
else:
output_line("Flash Success")
return 0
if __name__ == '__main__':
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
description="Katapult Flash Tool") description="Katapult Flash Tool")
parser.add_argument( parser.add_argument(
@ -670,55 +724,4 @@ def main():
) )
args = parser.parse_args() args = parser.parse_args()
if not args.verbose: exit(asyncio.run(main(args)))
logging.getLogger().setLevel(logging.ERROR)
intf = args.interface
fpath = pathlib.Path(args.firmware).expanduser().resolve()
loop = asyncio.get_event_loop()
iscan = args.device is None
req_only = args.request_bootloader
sock = None
try:
if iscan:
sock = CanSocket(loop)
if args.query:
loop.run_until_complete(sock.run_query(intf))
else:
if args.uuid is None:
raise FlashError(
"The 'uuid' option must be specified to flash a device"
)
output_line(f"Flashing CAN UUID {args.uuid} on interface {intf}")
uuid = int(args.uuid, 16)
loop.run_until_complete(sock.run(intf, uuid, fpath, req_only))
else:
if not HAS_SERIAL:
ser_inst_cmd = "pip3 install serial"
if shutil.which("apt") is not None:
ser_inst_cmd = "sudo apt install python3-serial"
raise FlashError(
"The pyserial python package was not found. To install "
"run the following command in a terminal: \n\n"
f" {ser_inst_cmd}\n\n"
)
if args.device is None:
raise FlashError(
"The 'device' option must be specified to flash a device"
)
output_line(f"Flashing Serial Device {args.device}, baud {args.baud}")
sock = SerialSocket(loop)
loop.run_until_complete(sock.run(args.device, args.baud, fpath))
except Exception:
logging.exception("Flash Error")
sys.exit(-1)
finally:
if sock is not None:
sock.close()
if args.query:
output_line("Query Complete")
else:
output_line("Flash Success")
if __name__ == '__main__':
main()