From 2d3a4884378f384346680a55196bf9244b99b6b6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Trzci=C5=84ski?= Date: Wed, 2 Jul 2025 23:14:38 +0200 Subject: [PATCH] webrtc: add ping/pong when sending a frame --- Makefile | 3 + html/control.html | 12 +++- html/webrtc.html | 3 +- output/webrtc/webrtc.cc | 125 ++++++++++++++++++++++++---------------- 4 files changed, 90 insertions(+), 53 deletions(-) diff --git a/Makefile b/Makefile index 2400dd1..7c5290d 100644 --- a/Makefile +++ b/Makefile @@ -12,6 +12,9 @@ LDLIBS := -lpthread -lstdc++ # Print #warnings CFLAGS += -Wno-error=cpp +# LOG_*(this, ...) +CFLAGS += -Wno-error=nonnull-compare + # libdatachannel deprecations on bookworm # error: 'HMAC_Init_ex' is deprecated: Since OpenSSL 3.0 CFLAGS += -Wno-error=deprecated-declarations diff --git a/html/control.html b/html/control.html index 9ff41d9..ea9b0c7 100644 --- a/html/control.html +++ b/html/control.html @@ -759,7 +759,11 @@ ]; fetch(baseURL + webrtcURL, { - body: JSON.stringify({type: 'request', iceServers: iceServers}), + body: JSON.stringify({ + type: 'request', + iceServers: iceServers, + keepAlive: true + }), headers: {'Content-Type': 'application/json'}, method: 'POST' }).then(function(response) { @@ -769,6 +773,12 @@ sdpSemantics: 'unified-plan', iceServers: request.iceServers }); + rtcPeerConnection.addEventListener('datachannel', function(e) { + const dc = e.channel; + dc.addEventListener('message', function(e) { + dc.send('pong'); + }); + }); rtcPeerConnection.addTransceiver('video', { direction: 'recvonly' }); //pc.addTransceiver('audio', {direction: 'recvonly'}); rtcPeerConnection.addEventListener('track', function(evt) { diff --git a/html/webrtc.html b/html/webrtc.html index 4a03b82..6da573f 100644 --- a/html/webrtc.html +++ b/html/webrtc.html @@ -60,7 +60,8 @@ body: JSON.stringify({ type: 'request', res: params.res, - iceServers: iceServers + iceServers: iceServers, + keepAlive: true }), headers: { 'Content-Type': 'application/json' diff --git a/output/webrtc/webrtc.cc b/output/webrtc/webrtc.cc index 19a1ac9..7fecc2e 100644 --- a/output/webrtc/webrtc.cc +++ b/output/webrtc/webrtc.cc @@ -16,6 +16,9 @@ extern "C" { #include "util/opts/helpers.hh" #include "util/http/json.hh" +#define DEFAULT_PING_INTERVAL_US (1 * 1000 * 1000) +#define DEFAULT_DISCONNECT_INTERVAL_US (30 * 1000 * 1000) + #ifdef USE_LIBDATACHANNEL #include @@ -37,7 +40,6 @@ using namespace std::chrono_literals; class Client; -static pthread_t heartbeat_thread; static webrtc_options_t *webrtc_options; static std::set > webrtc_clients; static std::mutex webrtc_clients_lock; @@ -49,6 +51,9 @@ static rtc::Configuration webrtc_configuration = { .disableAutoNegotiation = true }; +std::shared_ptr webrtc_find_client(std::string id); +void webrtc_remove_client(const std::shared_ptr &client, const char *reason); + struct ClientTrackData { std::shared_ptr track; @@ -96,8 +101,7 @@ public: } id = "rtc-" + id; name = strdup(id.c_str()); - dc = pc->createDataChannel("pingpong"); - last_heartbeat_s = get_monotonic_time_us(NULL, NULL) / 1000 / 1000; + last_ping_us = last_pong_us = get_monotonic_time_us(NULL, NULL); } ~Client() @@ -105,6 +109,38 @@ public: free(name); } + void close() + { + if (pc) + pc->close(); + } + + bool wantsKeepAlive() + { + return dc_keepAlive != nullptr; + } + + bool keepAlive() + { + if (!dc_keepAlive) + return true; + + uint64_t now_us = get_monotonic_time_us(NULL, NULL); + + if (dc_keepAlive->isOpen() && now_us - last_ping_us >= DEFAULT_PING_INTERVAL_US) { + LOG_DEBUG(this, "Checking if client still alive."); + dc_keepAlive->send("ping"); + last_ping_us = now_us; + } + + if (now_us - last_pong_us >= DEFAULT_DISCONNECT_INTERVAL_US) { + LOG_INFO(this, "No heartbeat from client."); + return false; + } + + return true; + } + bool wantsFrame() const { if (!pc || !video) @@ -167,7 +203,7 @@ public: char *name = NULL; std::string id; std::shared_ptr pc; - std::shared_ptr dc; + std::shared_ptr dc_keepAlive; std::shared_ptr video; std::mutex lock; std::condition_variable wait_for_complete; @@ -175,7 +211,8 @@ public: bool has_set_sdp_answer = false; bool had_key_frame = false; bool requested_key_frame = false; - uint64_t last_heartbeat_s; + uint64_t last_ping_us = 0; + uint64_t last_pong_us = 0; }; std::shared_ptr webrtc_find_client(std::string id) @@ -190,7 +227,7 @@ std::shared_ptr webrtc_find_client(std::string id) return std::shared_ptr(); } -static void webrtc_remove_client(const std::shared_ptr &client, const char *reason) +void webrtc_remove_client(const std::shared_ptr &client, const char *reason) { std::unique_lock lk(webrtc_clients_lock); webrtc_clients.erase(client); @@ -265,19 +302,27 @@ static std::shared_ptr webrtc_peer_connection(rtc::Configuration config, auto client = std::make_shared(pc); auto wclient = std::weak_ptr(client); - client->dc->onOpen([wclient]() { - if(auto client = wclient.lock()) { - LOG_DEBUG(client.get(), "data channel onOpen"); - } - }); + if (message.value("keepAlive", false)) { + LOG_INFO(client.get(), "Client supports Keep-Alives."); - client->dc->onMessage([wclient](auto message) { - auto client = wclient.lock(); - if(client && std::holds_alternative(message)) { - LOG_DEBUG(client.get(), "data channel onMessage: %s", std::get(message).c_str()); - client->last_heartbeat_s = get_monotonic_time_us(NULL, NULL) / 1000 / 1000; - } - }); + client->dc_keepAlive = pc->createDataChannel("keepalive"); + + client->dc_keepAlive->onOpen([wclient]() { + if(auto client = wclient.lock()) { + LOG_DEBUG(client.get(), "data channel onOpen"); + } + }); + + client->dc_keepAlive->onMessage([wclient](auto message) { + auto client = wclient.lock(); + if(client && std::holds_alternative(message)) { + LOG_DEBUG(client.get(), "data channel onMessage: %s", std::get(message).c_str()); + client->last_pong_us = get_monotonic_time_us(NULL, NULL); + } + }); + } else { + LOG_INFO(client.get(), "Client does not support Keep-Alives. This might result in stale streams."); + } pc->onTrack([wclient](std::shared_ptr track) { if(auto client = wclient.lock()) { @@ -342,10 +387,19 @@ static bool webrtc_h264_needs_buffer(buffer_lock_t *buf_lock) static void webrtc_h264_capture(buffer_lock_t *buf_lock, buffer_t *buf) { + std::set > lost_clients; + std::unique_lock lk(webrtc_clients_lock); for (auto client : webrtc_clients) { if (client->wantsFrame()) client->pushFrame(buf); + if (!client->keepAlive()) + lost_clients.insert(client); + } + lk.unlock(); + + for (auto lost_client : lost_clients) { + lost_client->close(); } } @@ -369,6 +423,7 @@ static void http_webrtc_request(http_worker_t *worker, FILE *stream, const nlohm message["id"] = client->id; message["type"] = description->typeString(); message["sdp"] = std::string(description.value()); + message["keepAlive"] = client->wantsKeepAlive(); client->describePeerConnection(message); http_write_response(stream, "200 OK", "application/json", message.dump().c_str(), 0); LOG_VERBOSE(client.get(), "Local SDP Offer: %s", std::string(message["sdp"]).c_str()); @@ -441,6 +496,7 @@ static void http_webrtc_offer(http_worker_t *worker, FILE *stream, const nlohman nlohmann::json message; message["type"] = description->typeString(); message["sdp"] = std::string(description.value()); + message["keepAlive"] = client->wantsKeepAlive(); client->describePeerConnection(message); http_write_response(stream, "200 OK", "application/json", message.dump().c_str(), 0); @@ -490,38 +546,6 @@ static void http_webrtc_remote_candidate(http_worker_t *worker, FILE *stream, co http_write_response(stream, "200 OK", "application/json", "{}", 0); } -static void *heartbeat_checker(void *) -{ - uint64_t disconnect = 10; - while (true) { - { - uint64_t now_s = get_monotonic_time_us(NULL, NULL) / 1000 / 1000; - std::unique_lock lk(webrtc_clients_lock); - auto it = webrtc_clients.begin(); - while (it != webrtc_clients.end()) { - auto client = *it; - if (!client) { - continue; - } - - uint64_t heartbeat_detla = now_s - client->last_heartbeat_s; - if (heartbeat_detla >= disconnect) { - LOG_INFO(client.get(), "No heartbeat from client, removing."); - it = webrtc_clients.erase(it); - } else { - if (heartbeat_detla > disconnect / 2) { - LOG_DEBUG(client.get(), "Checking if client still alive."); - client->dc->send("ping"); - } - it++; - } - } - } - sleep(1); - } - return NULL; -} - extern "C" void http_webrtc_offer(http_worker_t *worker, FILE *stream) { auto message = http_parse_json_body(worker, stream, webrtc_client_max_json_body); @@ -559,7 +583,6 @@ extern "C" int webrtc_server(webrtc_options_t *options) buffer_lock_register_check_streaming(&video_lock, webrtc_h264_needs_buffer); buffer_lock_register_notify_buffer(&video_lock, webrtc_h264_capture); - pthread_create(&heartbeat_thread, NULL, heartbeat_checker, NULL); options->running = true; return 0; }