From e4fe25fdb6ca1f2cb1c0ffb1b44d9daae2ea8e8d Mon Sep 17 00:00:00 2001 From: Joshua Piccari Date: Sat, 5 Aug 2023 14:12:22 -0700 Subject: [PATCH] webrtc: add liveness check to WebRTC output using data channel Previously, there was no way to detect if WebRTC clients were still connected to the stream. This lead to the RTC streams being kept open indefinitely when clients focibly closed the stream. This can happen with Chrome, Edge, Safari, etc when the tab is closed or on mobile devices when the screen is locked or browser closed. This change adds a data channel to the stream and requires users to respond to ping requests. While sending a frame the ping might be sent to the client. If clients do not respond to these ping requests before the timeout duration the stream is closed and the client is removed from the server. This causes the client to reinitiate the connection. The timeout defaults to 30 seconds. The client needs to send `keepAlive: true` as part of webrtc request. On the received `ping` send back the `pong` over data-channel. --- html/webrtc.html | 8 +++++++ output/webrtc/webrtc.cc | 53 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 61 insertions(+) diff --git a/html/webrtc.html b/html/webrtc.html index 08e6692..4a03b82 100644 --- a/html/webrtc.html +++ b/html/webrtc.html @@ -73,6 +73,14 @@ sdpSemantics: 'unified-plan', iceServers: request.iceServers }); + + pc.addEventListener('datachannel', function(e) { + const dc = e.channel; + dc.addEventListener('message', function(e) { + dc.send('pong'); + }); + }); + pc.remote_pc_id = request.id; pc.addTransceiver('video', { direction: 'recvonly' }); pc.addEventListener('track', function(evt) { diff --git a/output/webrtc/webrtc.cc b/output/webrtc/webrtc.cc index ab36b45..19a1ac9 100644 --- a/output/webrtc/webrtc.cc +++ b/output/webrtc/webrtc.cc @@ -37,6 +37,7 @@ using namespace std::chrono_literals; class Client; +static pthread_t heartbeat_thread; static webrtc_options_t *webrtc_options; static std::set > webrtc_clients; static std::mutex webrtc_clients_lock; @@ -95,6 +96,8 @@ public: } id = "rtc-" + id; name = strdup(id.c_str()); + dc = pc->createDataChannel("pingpong"); + last_heartbeat_s = get_monotonic_time_us(NULL, NULL) / 1000 / 1000; } ~Client() @@ -164,6 +167,7 @@ public: char *name = NULL; std::string id; std::shared_ptr pc; + std::shared_ptr dc; std::shared_ptr video; std::mutex lock; std::condition_variable wait_for_complete; @@ -171,6 +175,7 @@ public: bool has_set_sdp_answer = false; bool had_key_frame = false; bool requested_key_frame = false; + uint64_t last_heartbeat_s; }; std::shared_ptr webrtc_find_client(std::string id) @@ -260,6 +265,20 @@ static std::shared_ptr webrtc_peer_connection(rtc::Configuration config, auto client = std::make_shared(pc); auto wclient = std::weak_ptr(client); + client->dc->onOpen([wclient]() { + if(auto client = wclient.lock()) { + LOG_DEBUG(client.get(), "data channel onOpen"); + } + }); + + client->dc->onMessage([wclient](auto message) { + auto client = wclient.lock(); + if(client && std::holds_alternative(message)) { + LOG_DEBUG(client.get(), "data channel onMessage: %s", std::get(message).c_str()); + client->last_heartbeat_s = get_monotonic_time_us(NULL, NULL) / 1000 / 1000; + } + }); + pc->onTrack([wclient](std::shared_ptr track) { if(auto client = wclient.lock()) { LOG_DEBUG(client.get(), "onTrack: %s", track->mid().c_str()); @@ -471,6 +490,38 @@ static void http_webrtc_remote_candidate(http_worker_t *worker, FILE *stream, co http_write_response(stream, "200 OK", "application/json", "{}", 0); } +static void *heartbeat_checker(void *) +{ + uint64_t disconnect = 10; + while (true) { + { + uint64_t now_s = get_monotonic_time_us(NULL, NULL) / 1000 / 1000; + std::unique_lock lk(webrtc_clients_lock); + auto it = webrtc_clients.begin(); + while (it != webrtc_clients.end()) { + auto client = *it; + if (!client) { + continue; + } + + uint64_t heartbeat_detla = now_s - client->last_heartbeat_s; + if (heartbeat_detla >= disconnect) { + LOG_INFO(client.get(), "No heartbeat from client, removing."); + it = webrtc_clients.erase(it); + } else { + if (heartbeat_detla > disconnect / 2) { + LOG_DEBUG(client.get(), "Checking if client still alive."); + client->dc->send("ping"); + } + it++; + } + } + } + sleep(1); + } + return NULL; +} + extern "C" void http_webrtc_offer(http_worker_t *worker, FILE *stream) { auto message = http_parse_json_body(worker, stream, webrtc_client_max_json_body); @@ -507,6 +558,8 @@ extern "C" int webrtc_server(webrtc_options_t *options) buffer_lock_register_check_streaming(&video_lock, webrtc_h264_needs_buffer); buffer_lock_register_notify_buffer(&video_lock, webrtc_h264_capture); + + pthread_create(&heartbeat_thread, NULL, heartbeat_checker, NULL); options->running = true; return 0; }