#!/usr/bin/python3 # Python 3 port of "CanSerial" # # Copyright (C) 2021 Eric Callahan # # This file may be distributed under the terms of the GNU GPLv3 license. import asyncio import socket import os import sys import pty import termios import fcntl import struct import time import logging import errno CAN_FMT = " 3: logging.info(f"Connection Timed Out: {self.uuid_str}") self.close() return self.ping_tries += 1 self.canserial.can_send(self.can_id) def handle_can_data(self, data): # TODO: we may need to make sure something is connected # to the other end of the pty. We shouldn't write to # it otherwise. self.last_recd_time = self.aio_loop.time() if not data: # This is pong self.ping_tries = 0 return try: os.write(self.pty_fd, data) except Exception: logging.exception("Error writing to pty") def respond_id_request(self): self.last_recd_time = self.aio_loop.time() can_id_bytes = struct.pack("= 16: packet = self.input_buffer[:16] self._process_packet(packet) self.input_buffer = self.input_buffer[16:] self.input_busy = False def _process_packet(self, packet): can_id, length, data = struct.unpack(CAN_FMT, packet) can_id &= socket.CAN_EFF_MASK payload = data[:length] canfunc = self.can_response_cbs.get(can_id) if canfunc is None: logging.info(f"No response callback for CAN ID {can_id:3X}'") return canfunc(payload) def _handle_set_id(self, uuid_bytes): if len(uuid_bytes) != CANBUS_UUID_LEN: logging.info(f"Invalid length for UUID: {uuid_bytes}") return uuid = ":".join([f"{b:02X}" for b in uuid_bytes]) can_id = None if uuid in self.mapped_can_devs: can_id = self.mapped_can_devs[uuid] if can_id in self.device_links: self.device_links[can_id].respond_id_request() return else: can_id = self._assign_can_port(uuid) new_dev = CanDeviceLink(self, uuid_bytes, can_id) self.device_links[can_id] = new_dev self.can_response_cbs[can_id + 1] = new_dev.handle_can_data def _assign_can_port(self, uuid): port = len(self.mapped_can_devs) + 1 with open(UUID_CONFIG_FILE, "a") as f: f.write(f"{port} {uuid}\n") can_id = 2 * port + CANBUS_PORT_OFFSET self.mapped_can_devs[uuid] = can_id return can_id def _request_can_status(self): # See Check for new devices self.can_send(CANBUS_ID_UUID) # Ping Each device for dev in list(self.device_links.values()): dev.ping() def can_send(self, can_id, payload=b""): if can_id > 0x7FF: can_id |= socket.CAN_EFF_FLAG if not payload: packet = struct.pack(CAN_FMT, can_id, 0, b"") self.output_packets.append(packet) else: while payload: length = min(len(payload), 8) pkt_data = payload[:length] payload = payload[length:] packet = struct.pack( CAN_FMT, can_id, length, pkt_data) self.output_packets.append(packet) if self.output_busy: return self.output_busy = True asyncio.create_task(self._do_can_send()) async def _do_can_send(self): while self.output_packets: packet = self.output_packets.pop(0) try: await self.aio_loop.sock_sendall(self.cansock, packet) except socket.error: logging.info("Socket Write Error, closing") self.close() break self.output_busy = False def start(self): try: self.cansock.bind(("can0",)) except Exception: raise CanSerialError("Unable to bind socket to can0") self.cansock.setblocking(False) self.aio_loop.add_reader( self.cansock.fileno(), self._handle_can_response) self.can_status_cb.start() self.aio_loop.run_forever() def close(self): self.can_status_cb.stop() self.aio_loop.remove_reader(self.cansock.fileno()) for dev in list(self.device_links.values()): dev.close() self.aio_loop.stop() def main(): while True: try: canserial = CanSerial() canserial.start() except Exception as e: logging.exception(str(e)) time.sleep(5.) if __name__ == "__main__": logging.basicConfig(level=logging.DEBUG) # Initialize UUID file if it doesn't exist if not os.path.exists(UUID_CONFIG_FILE): with open(UUID_CONFIG_FILE, "w") as f: f.write("# [port] [UUID]\n") main()