diff --git a/scripts/flash_can.py b/scripts/flash_can.py index 08f9435..fdb162a 100755 --- a/scripts/flash_can.py +++ b/scripts/flash_can.py @@ -50,13 +50,7 @@ BOOTLOADER_CMDS = { } ACK_SUCCESS = 0xa0 -NACK_RESPONSES = { - 0xf0: "Invalid Command", - 0xf1: "CRC Mismatch", - 0xf2: "Invalid Payload", - 0xf3: "Invalid Trailer", - 0xf4: "Invalid Payload Length" -} +NACK = 0xf1 # Klipper Admin Defs (for jumping to bootloader) KLIPPER_ADMIN_ID = 0x3f0 @@ -87,20 +81,26 @@ class CanFlasher: self.file_size = 0 self.block_size = 64 self.block_count = 0 + self.app_start_addr = 0 async def connect_btl(self): output_line("Attempting to connect to bootloader") ret = await self.send_command('CONNECT') - self.block_size, = struct.unpack(">H", ret[2:]) + ver_bytes, start_addr, self.block_size = struct.unpack("<4sII", ret) + self.app_start_addr = start_addr + proto_version = ".".join([str(v) for v in reversed(ver_bytes[:3])]) if self.block_size not in [64, 128, 256, 512]: raise FlashCanError("Invalid Block Size: %d" % (self.block_size,)) - output_line("Connected, Block Size: %d bytes" % (self.block_size,)) + output_line( + f"CanBoot Connected\nProtocol Version: {proto_version}\n" + f"Block Size: {self.block_size} bytes\n" + f"Application Start: 0x{self.app_start_addr:4X}" + ) async def send_command( self, cmdname: str, payload: bytes = b"", - response_length: int = 12, tries: int = 5 ) -> bytearray: word_cnt = (len(payload) // 4) & 0xFF @@ -110,31 +110,37 @@ class CanFlasher: out_cmd.append(word_cnt) if payload: out_cmd.extend(payload) + crc = crc16_ccitt(out_cmd[2:]) + out_cmd.extend(struct.pack("H", crc)) while tries: + data = bytearray() + recd_len = 0 try: - ret = await self.node.write_with_response( - out_cmd, response_length - ) - data = bytearray(ret) + self.node.write(out_cmd) + read_done = False + while not read_done: + ret = await self.node.readuntil() + data.extend(ret) + while len(data) > 7: + if data[:2] != CMD_HEADER: + data = data[1:] + continue + recd_len = data[3] * 4 + read_done = len(data) == recd_len + 8 + break + except Exception: logging.exception("Can Read Error") else: - header = data[:2] - trailer = data[-4:-2] - recd_crc, = struct.unpack(">H", data[-2:]) - calc_crc = crc16_ccitt(data[:-2]) + trailer = data[-2:] + recd_crc, = struct.unpack("H", data[6:8]) - msg += f", sent crc: {crc}, mcu_crc: {mcu_crc}" - logging.info(msg) + logging.info(f"Command '{cmdname}': Received NACK") elif cmd_response != cmd: logging.info( f"Command '{cmdname}': Acknowledged wrong command, " f"expected: {cmd:2x}, received: {cmd_response:2x}" ) else: - # Validation passed, return payload - return data[4:recd_len + 4] + # Validation passed, return payload sans command + if recd_len <= 4: + return bytearray() + return data[8:recd_len + 4] tries -= 1 # clear the read buffer try: - await self.node.read(256, timeout=.1) + await self.node.read(1024, timeout=.1) except asyncio.TimeoutError: pass await asyncio.sleep(.1) @@ -179,6 +180,7 @@ class CanFlasher: f.seek(0, os.SEEK_END) self.file_size = f.tell() f.seek(0) + flash_address = self.app_start_addr while True: buf = f.read(self.block_size) if not buf: @@ -186,21 +188,22 @@ class CanFlasher: if len(buf) < self.block_size: buf += b"\xFF" * (self.block_size - len(buf)) self.fw_sha.update(buf) - prefix = struct.pack(">I", self.block_count) + prefix = struct.pack("H", resp[2:]) - if recd_block == self.block_count: + recd_addr, = struct.unpack("H", resp[2:]) + page_count, = struct.unpack("I", i) - resp = await self.send_command("REQUEST_BLOCK", payload, 76) - recd_block, = struct.unpack(">H", resp[2:4]) - if recd_block == i: + payload = struct.pack(" bytes: return await asyncio.wait_for(self._reader.readexactly(n), timeout) + async def readuntil( + self, sep: bytes = b"\x03", timeout: Optional[float] = 2 + ) -> bytes: + return await asyncio.wait_for(self._reader.readuntil(sep), timeout) + def write(self, payload: Union[bytes, bytearray]) -> None: if isinstance(payload, bytearray): payload = bytes(payload) @@ -491,8 +500,14 @@ def main(): "-q", "--query", action="store_true", help="Query Bootloader Device IDs" ) + parser.add_argument( + "-v", "--verbose", action="store_true", + help="Enable verbose responses" + ) args = parser.parse_args() + if not args.verbose: + logging.getLogger().setLevel(logging.CRITICAL) intf = args.interface fpath = pathlib.Path(args.firmware).expanduser().resolve() loop = asyncio.get_event_loop()