webrtc: add ping/pong when sending a frame

This commit is contained in:
Kamil Trzciński 2025-07-02 23:14:38 +02:00
parent e4fe25fdb6
commit 2d3a488437
4 changed files with 90 additions and 53 deletions

View File

@ -12,6 +12,9 @@ LDLIBS := -lpthread -lstdc++
# Print #warnings # Print #warnings
CFLAGS += -Wno-error=cpp CFLAGS += -Wno-error=cpp
# LOG_*(this, ...)
CFLAGS += -Wno-error=nonnull-compare
# libdatachannel deprecations on bookworm # libdatachannel deprecations on bookworm
# error: 'HMAC_Init_ex' is deprecated: Since OpenSSL 3.0 # error: 'HMAC_Init_ex' is deprecated: Since OpenSSL 3.0
CFLAGS += -Wno-error=deprecated-declarations CFLAGS += -Wno-error=deprecated-declarations

View File

@ -759,7 +759,11 @@
]; ];
fetch(baseURL + webrtcURL, { fetch(baseURL + webrtcURL, {
body: JSON.stringify({type: 'request', iceServers: iceServers}), body: JSON.stringify({
type: 'request',
iceServers: iceServers,
keepAlive: true
}),
headers: {'Content-Type': 'application/json'}, headers: {'Content-Type': 'application/json'},
method: 'POST' method: 'POST'
}).then(function(response) { }).then(function(response) {
@ -769,6 +773,12 @@
sdpSemantics: 'unified-plan', sdpSemantics: 'unified-plan',
iceServers: request.iceServers iceServers: request.iceServers
}); });
rtcPeerConnection.addEventListener('datachannel', function(e) {
const dc = e.channel;
dc.addEventListener('message', function(e) {
dc.send('pong');
});
});
rtcPeerConnection.addTransceiver('video', { direction: 'recvonly' }); rtcPeerConnection.addTransceiver('video', { direction: 'recvonly' });
//pc.addTransceiver('audio', {direction: 'recvonly'}); //pc.addTransceiver('audio', {direction: 'recvonly'});
rtcPeerConnection.addEventListener('track', function(evt) { rtcPeerConnection.addEventListener('track', function(evt) {

View File

@ -60,7 +60,8 @@
body: JSON.stringify({ body: JSON.stringify({
type: 'request', type: 'request',
res: params.res, res: params.res,
iceServers: iceServers iceServers: iceServers,
keepAlive: true
}), }),
headers: { headers: {
'Content-Type': 'application/json' 'Content-Type': 'application/json'

View File

@ -16,6 +16,9 @@ extern "C" {
#include "util/opts/helpers.hh" #include "util/opts/helpers.hh"
#include "util/http/json.hh" #include "util/http/json.hh"
#define DEFAULT_PING_INTERVAL_US (1 * 1000 * 1000)
#define DEFAULT_DISCONNECT_INTERVAL_US (30 * 1000 * 1000)
#ifdef USE_LIBDATACHANNEL #ifdef USE_LIBDATACHANNEL
#include <string> #include <string>
@ -37,7 +40,6 @@ using namespace std::chrono_literals;
class Client; class Client;
static pthread_t heartbeat_thread;
static webrtc_options_t *webrtc_options; static webrtc_options_t *webrtc_options;
static std::set<std::shared_ptr<Client> > webrtc_clients; static std::set<std::shared_ptr<Client> > webrtc_clients;
static std::mutex webrtc_clients_lock; static std::mutex webrtc_clients_lock;
@ -49,6 +51,9 @@ static rtc::Configuration webrtc_configuration = {
.disableAutoNegotiation = true .disableAutoNegotiation = true
}; };
std::shared_ptr<Client> webrtc_find_client(std::string id);
void webrtc_remove_client(const std::shared_ptr<Client> &client, const char *reason);
struct ClientTrackData struct ClientTrackData
{ {
std::shared_ptr<rtc::Track> track; std::shared_ptr<rtc::Track> track;
@ -96,8 +101,7 @@ public:
} }
id = "rtc-" + id; id = "rtc-" + id;
name = strdup(id.c_str()); name = strdup(id.c_str());
dc = pc->createDataChannel("pingpong"); last_ping_us = last_pong_us = get_monotonic_time_us(NULL, NULL);
last_heartbeat_s = get_monotonic_time_us(NULL, NULL) / 1000 / 1000;
} }
~Client() ~Client()
@ -105,6 +109,38 @@ public:
free(name); free(name);
} }
void close()
{
if (pc)
pc->close();
}
bool wantsKeepAlive()
{
return dc_keepAlive != nullptr;
}
bool keepAlive()
{
if (!dc_keepAlive)
return true;
uint64_t now_us = get_monotonic_time_us(NULL, NULL);
if (dc_keepAlive->isOpen() && now_us - last_ping_us >= DEFAULT_PING_INTERVAL_US) {
LOG_DEBUG(this, "Checking if client still alive.");
dc_keepAlive->send("ping");
last_ping_us = now_us;
}
if (now_us - last_pong_us >= DEFAULT_DISCONNECT_INTERVAL_US) {
LOG_INFO(this, "No heartbeat from client.");
return false;
}
return true;
}
bool wantsFrame() const bool wantsFrame() const
{ {
if (!pc || !video) if (!pc || !video)
@ -167,7 +203,7 @@ public:
char *name = NULL; char *name = NULL;
std::string id; std::string id;
std::shared_ptr<rtc::PeerConnection> pc; std::shared_ptr<rtc::PeerConnection> pc;
std::shared_ptr<rtc::DataChannel> dc; std::shared_ptr<rtc::DataChannel> dc_keepAlive;
std::shared_ptr<ClientTrackData> video; std::shared_ptr<ClientTrackData> video;
std::mutex lock; std::mutex lock;
std::condition_variable wait_for_complete; std::condition_variable wait_for_complete;
@ -175,7 +211,8 @@ public:
bool has_set_sdp_answer = false; bool has_set_sdp_answer = false;
bool had_key_frame = false; bool had_key_frame = false;
bool requested_key_frame = false; bool requested_key_frame = false;
uint64_t last_heartbeat_s; uint64_t last_ping_us = 0;
uint64_t last_pong_us = 0;
}; };
std::shared_ptr<Client> webrtc_find_client(std::string id) std::shared_ptr<Client> webrtc_find_client(std::string id)
@ -190,7 +227,7 @@ std::shared_ptr<Client> webrtc_find_client(std::string id)
return std::shared_ptr<Client>(); return std::shared_ptr<Client>();
} }
static void webrtc_remove_client(const std::shared_ptr<Client> &client, const char *reason) void webrtc_remove_client(const std::shared_ptr<Client> &client, const char *reason)
{ {
std::unique_lock lk(webrtc_clients_lock); std::unique_lock lk(webrtc_clients_lock);
webrtc_clients.erase(client); webrtc_clients.erase(client);
@ -265,19 +302,27 @@ static std::shared_ptr<Client> webrtc_peer_connection(rtc::Configuration config,
auto client = std::make_shared<Client>(pc); auto client = std::make_shared<Client>(pc);
auto wclient = std::weak_ptr(client); auto wclient = std::weak_ptr(client);
client->dc->onOpen([wclient]() { if (message.value("keepAlive", false)) {
if(auto client = wclient.lock()) { LOG_INFO(client.get(), "Client supports Keep-Alives.");
LOG_DEBUG(client.get(), "data channel onOpen");
}
});
client->dc->onMessage([wclient](auto message) { client->dc_keepAlive = pc->createDataChannel("keepalive");
auto client = wclient.lock();
if(client && std::holds_alternative<rtc::string>(message)) { client->dc_keepAlive->onOpen([wclient]() {
LOG_DEBUG(client.get(), "data channel onMessage: %s", std::get<std::string>(message).c_str()); if(auto client = wclient.lock()) {
client->last_heartbeat_s = get_monotonic_time_us(NULL, NULL) / 1000 / 1000; LOG_DEBUG(client.get(), "data channel onOpen");
} }
}); });
client->dc_keepAlive->onMessage([wclient](auto message) {
auto client = wclient.lock();
if(client && std::holds_alternative<rtc::string>(message)) {
LOG_DEBUG(client.get(), "data channel onMessage: %s", std::get<std::string>(message).c_str());
client->last_pong_us = get_monotonic_time_us(NULL, NULL);
}
});
} else {
LOG_INFO(client.get(), "Client does not support Keep-Alives. This might result in stale streams.");
}
pc->onTrack([wclient](std::shared_ptr<rtc::Track> track) { pc->onTrack([wclient](std::shared_ptr<rtc::Track> track) {
if(auto client = wclient.lock()) { if(auto client = wclient.lock()) {
@ -342,10 +387,19 @@ static bool webrtc_h264_needs_buffer(buffer_lock_t *buf_lock)
static void webrtc_h264_capture(buffer_lock_t *buf_lock, buffer_t *buf) static void webrtc_h264_capture(buffer_lock_t *buf_lock, buffer_t *buf)
{ {
std::set<std::shared_ptr<Client> > lost_clients;
std::unique_lock lk(webrtc_clients_lock); std::unique_lock lk(webrtc_clients_lock);
for (auto client : webrtc_clients) { for (auto client : webrtc_clients) {
if (client->wantsFrame()) if (client->wantsFrame())
client->pushFrame(buf); client->pushFrame(buf);
if (!client->keepAlive())
lost_clients.insert(client);
}
lk.unlock();
for (auto lost_client : lost_clients) {
lost_client->close();
} }
} }
@ -369,6 +423,7 @@ static void http_webrtc_request(http_worker_t *worker, FILE *stream, const nlohm
message["id"] = client->id; message["id"] = client->id;
message["type"] = description->typeString(); message["type"] = description->typeString();
message["sdp"] = std::string(description.value()); message["sdp"] = std::string(description.value());
message["keepAlive"] = client->wantsKeepAlive();
client->describePeerConnection(message); client->describePeerConnection(message);
http_write_response(stream, "200 OK", "application/json", message.dump().c_str(), 0); http_write_response(stream, "200 OK", "application/json", message.dump().c_str(), 0);
LOG_VERBOSE(client.get(), "Local SDP Offer: %s", std::string(message["sdp"]).c_str()); LOG_VERBOSE(client.get(), "Local SDP Offer: %s", std::string(message["sdp"]).c_str());
@ -441,6 +496,7 @@ static void http_webrtc_offer(http_worker_t *worker, FILE *stream, const nlohman
nlohmann::json message; nlohmann::json message;
message["type"] = description->typeString(); message["type"] = description->typeString();
message["sdp"] = std::string(description.value()); message["sdp"] = std::string(description.value());
message["keepAlive"] = client->wantsKeepAlive();
client->describePeerConnection(message); client->describePeerConnection(message);
http_write_response(stream, "200 OK", "application/json", message.dump().c_str(), 0); http_write_response(stream, "200 OK", "application/json", message.dump().c_str(), 0);
@ -490,38 +546,6 @@ static void http_webrtc_remote_candidate(http_worker_t *worker, FILE *stream, co
http_write_response(stream, "200 OK", "application/json", "{}", 0); http_write_response(stream, "200 OK", "application/json", "{}", 0);
} }
static void *heartbeat_checker(void *)
{
uint64_t disconnect = 10;
while (true) {
{
uint64_t now_s = get_monotonic_time_us(NULL, NULL) / 1000 / 1000;
std::unique_lock lk(webrtc_clients_lock);
auto it = webrtc_clients.begin();
while (it != webrtc_clients.end()) {
auto client = *it;
if (!client) {
continue;
}
uint64_t heartbeat_detla = now_s - client->last_heartbeat_s;
if (heartbeat_detla >= disconnect) {
LOG_INFO(client.get(), "No heartbeat from client, removing.");
it = webrtc_clients.erase(it);
} else {
if (heartbeat_detla > disconnect / 2) {
LOG_DEBUG(client.get(), "Checking if client still alive.");
client->dc->send("ping");
}
it++;
}
}
}
sleep(1);
}
return NULL;
}
extern "C" void http_webrtc_offer(http_worker_t *worker, FILE *stream) extern "C" void http_webrtc_offer(http_worker_t *worker, FILE *stream)
{ {
auto message = http_parse_json_body(worker, stream, webrtc_client_max_json_body); auto message = http_parse_json_body(worker, stream, webrtc_client_max_json_body);
@ -559,7 +583,6 @@ extern "C" int webrtc_server(webrtc_options_t *options)
buffer_lock_register_check_streaming(&video_lock, webrtc_h264_needs_buffer); buffer_lock_register_check_streaming(&video_lock, webrtc_h264_needs_buffer);
buffer_lock_register_notify_buffer(&video_lock, webrtc_h264_capture); buffer_lock_register_notify_buffer(&video_lock, webrtc_h264_capture);
pthread_create(&heartbeat_thread, NULL, heartbeat_checker, NULL);
options->running = true; options->running = true;
return 0; return 0;
} }