Joshua Piccari e4fe25fdb6 webrtc: add liveness check to WebRTC output using data channel
Previously, there was no way to detect if WebRTC clients were still
connected to the stream. This lead to the RTC streams being kept
open indefinitely when clients focibly closed the stream. This can
happen with Chrome, Edge, Safari, etc when the tab is closed or on
mobile devices when the screen is locked or browser closed.

This change adds a data channel to the stream and requires users to
respond to ping requests. While sending a frame the ping might be
sent to the client. If clients do not respond to these ping requests before
the timeout duration the stream is closed and the client is removed
from the server. This causes the client to reinitiate the connection.

The timeout defaults to 30 seconds.

The client needs to send `keepAlive: true` as part of webrtc request.
On the received `ping` send back the `pong` over data-channel.
2025-07-03 00:07:59 +02:00

580 lines
18 KiB
C++

extern "C" {
#include "webrtc.h"
#include "device/buffer.h"
#include "device/buffer_list.h"
#include "device/buffer_lock.h"
#include "device/device.h"
#include "output/output.h"
#include "util/http/http.h"
#include "util/opts/log.h"
#include "util/opts/fourcc.h"
#include "util/opts/control.h"
#include "util/opts/opts.h"
#include "device/buffer.h"
};
#include "util/opts/helpers.hh"
#include "util/http/json.hh"
#ifdef USE_LIBDATACHANNEL
#include <string>
#include <memory>
#include <optional>
#include <condition_variable>
#include <atomic>
#include <chrono>
#include <set>
#include <rtc/peerconnection.hpp>
#include <rtc/rtcpsrreporter.hpp>
#include <rtc/h264rtppacketizer.hpp>
#include <rtc/h264packetizationhandler.hpp>
#include <rtc/rtcpnackresponder.hpp>
#include "third_party/magic_enum/include/magic_enum.hpp"
using namespace std::chrono_literals;
class Client;
static pthread_t heartbeat_thread;
static webrtc_options_t *webrtc_options;
static std::set<std::shared_ptr<Client> > webrtc_clients;
static std::mutex webrtc_clients_lock;
static const auto webrtc_client_lock_timeout = 3 * 1000ms;
static const auto webrtc_client_max_json_body = 10 * 1024;
static const auto webrtc_client_video_payload_type = 102; // H264
static rtc::Configuration webrtc_configuration = {
// .iceServers = { rtc::IceServer("stun:stun.l.google.com:19302") },
.disableAutoNegotiation = true
};
struct ClientTrackData
{
std::shared_ptr<rtc::Track> track;
std::shared_ptr<rtc::RtcpSrReporter> sender;
void startStreaming()
{
double currentTime_s = get_monotonic_time_us(NULL, NULL)/(1000.0*1000.0);
sender->rtpConfig->setStartTime(currentTime_s, rtc::RtpPacketizationConfig::EpochStart::T1970);
sender->startRecording();
}
void sendTime()
{
double currentTime_s = get_monotonic_time_us(NULL, NULL)/(1000.0*1000.0);
auto rtpConfig = sender->rtpConfig;
uint32_t elapsedTimestamp = rtpConfig->secondsToTimestamp(currentTime_s);
sender->rtpConfig->timestamp = sender->rtpConfig->startTimestamp + elapsedTimestamp;
auto reportElapsedTimestamp = sender->rtpConfig->timestamp - sender->previousReportedTimestamp;
if (sender->rtpConfig->timestampToSeconds(reportElapsedTimestamp) > 1) {
sender->setNeedsToReport();
}
}
bool wantsFrame() const
{
if (!track)
return false;
return track->isOpen();
}
};
class Client
{
public:
Client(std::shared_ptr<rtc::PeerConnection> pc_)
: pc(pc_)
{
id.resize(20);
for (auto & c : id) {
c = 'a' + (rand() % 26);
}
id = "rtc-" + id;
name = strdup(id.c_str());
dc = pc->createDataChannel("pingpong");
last_heartbeat_s = get_monotonic_time_us(NULL, NULL) / 1000 / 1000;
}
~Client()
{
free(name);
}
bool wantsFrame() const
{
if (!pc || !video)
return false;
if (pc->state() != rtc::PeerConnection::State::Connected)
return false;
return video->wantsFrame();
}
void pushFrame(buffer_t *buf)
{
if (!video || !video->track) {
return;
}
if (!had_key_frame) {
had_key_frame = buf->flags.is_keyframe;
}
if (!had_key_frame) {
if (!requested_key_frame) {
device_video_force_key(buf->buf_list->dev);
requested_key_frame = true;
}
return;
}
rtc::binary data((std::byte*)buf->start, (std::byte*)buf->start + buf->used);
video->sendTime();
video->track->send(data);
}
void describePeerConnection(nlohmann::json &message)
{
nlohmann::json ice_servers = nlohmann::json::array();
for (const auto &ice_server : pc->config()->iceServers) {
nlohmann::json json;
std::string url;
if (ice_server.type == rtc::IceServer::Type::Turn) {
json["username"] = ice_server.username;
json["credential"] = ice_server.password;
url = ice_server.relayType == rtc::IceServer::RelayType::TurnTls ? "turns:" : "turn:";
} else {
url = "stun:";
}
url += ice_server.hostname + ":" + std::to_string(ice_server.port);
json["urls"] = url;
ice_servers.push_back(json);
}
message["iceServers"] = ice_servers;
}
public:
char *name = NULL;
std::string id;
std::shared_ptr<rtc::PeerConnection> pc;
std::shared_ptr<rtc::DataChannel> dc;
std::shared_ptr<ClientTrackData> video;
std::mutex lock;
std::condition_variable wait_for_complete;
std::vector<rtc::Candidate> pending_remote_candidates;
bool has_set_sdp_answer = false;
bool had_key_frame = false;
bool requested_key_frame = false;
uint64_t last_heartbeat_s;
};
std::shared_ptr<Client> webrtc_find_client(std::string id)
{
std::unique_lock lk(webrtc_clients_lock);
for (auto client : webrtc_clients) {
if (client && client->id == id) {
return client;
}
}
return std::shared_ptr<Client>();
}
static void webrtc_remove_client(const std::shared_ptr<Client> &client, const char *reason)
{
std::unique_lock lk(webrtc_clients_lock);
webrtc_clients.erase(client);
LOG_INFO(client.get(), "Client removed: %s.", reason);
}
static std::shared_ptr<ClientTrackData> webrtc_add_video(const std::shared_ptr<rtc::PeerConnection> pc, const uint8_t payloadType, const uint32_t ssrc, const std::string cname, const std::string msid)
{
auto video = rtc::Description::Video(cname, rtc::Description::Direction::SendOnly);
video.addH264Codec(payloadType);
video.setBitrate(1000);
video.addSSRC(ssrc, cname, msid, cname);
auto track = pc->addTrack(video);
auto rtpConfig = std::make_shared<rtc::RtpPacketizationConfig>(ssrc, cname, payloadType, rtc::H264RtpPacketizer::defaultClockRate);
auto packetizer = std::make_shared<rtc::H264RtpPacketizer>(rtc::H264RtpPacketizer::Separator::LongStartSequence, rtpConfig);
auto h264Handler = std::make_shared<rtc::H264PacketizationHandler>(packetizer);
auto srReporter = std::make_shared<rtc::RtcpSrReporter>(rtpConfig);
h264Handler->addToChain(srReporter);
auto nackResponder = std::make_shared<rtc::RtcpNackResponder>();
h264Handler->addToChain(nackResponder);
track->setMediaHandler(h264Handler);
return std::shared_ptr<ClientTrackData>(new ClientTrackData{track, srReporter});
}
static void webrtc_parse_ice_servers(rtc::Configuration &config, const nlohmann::json &message)
{
auto ice_servers = message.find("iceServers");
if (ice_servers == message.end() || !ice_servers->is_array())
return;
if (webrtc_options->disable_client_ice) {
LOG_VERBOSE(NULL, "ICE server from SDP request ignored due to `disable_client_ice`: %s",
ice_servers->dump().c_str());
return;
}
for (const auto& ice_server : *ice_servers) {
try {
auto urls = ice_server["urls"];
// convert non array to array
if (!urls.is_array()) {
urls = nlohmann::json::array();
urls.push_back(ice_server["urls"]);
}
for (const auto& url : urls) {
auto iceServer = rtc::IceServer(url.get<std::string>());
if (iceServer.type == rtc::IceServer::Type::Turn) {
if (ice_server.contains("username"))
iceServer.username = ice_server["username"].get<std::string>();
if (ice_server.contains("credential"))
iceServer.password = ice_server["credential"].get<std::string>();
}
config.iceServers.push_back(iceServer);
LOG_VERBOSE(NULL, "Added ICE server from SDP request json: %s", url.dump().c_str());
}
} catch (nlohmann::detail::exception &e) {
LOG_VERBOSE(NULL, "Failed to parse ICE server: %s: %s",
ice_server.dump().c_str(), e.what());
}
}
}
static std::shared_ptr<Client> webrtc_peer_connection(rtc::Configuration config, const nlohmann::json &message)
{
webrtc_parse_ice_servers(config, message);
auto pc = std::make_shared<rtc::PeerConnection>(config);
auto client = std::make_shared<Client>(pc);
auto wclient = std::weak_ptr(client);
client->dc->onOpen([wclient]() {
if(auto client = wclient.lock()) {
LOG_DEBUG(client.get(), "data channel onOpen");
}
});
client->dc->onMessage([wclient](auto message) {
auto client = wclient.lock();
if(client && std::holds_alternative<rtc::string>(message)) {
LOG_DEBUG(client.get(), "data channel onMessage: %s", std::get<std::string>(message).c_str());
client->last_heartbeat_s = get_monotonic_time_us(NULL, NULL) / 1000 / 1000;
}
});
pc->onTrack([wclient](std::shared_ptr<rtc::Track> track) {
if(auto client = wclient.lock()) {
LOG_DEBUG(client.get(), "onTrack: %s", track->mid().c_str());
}
});
pc->onLocalDescription([wclient](rtc::Description description) {
if(auto client = wclient.lock()) {
LOG_DEBUG(client.get(), "onLocalDescription: %s", description.typeString().c_str());
}
});
pc->onSignalingStateChange([wclient](rtc::PeerConnection::SignalingState state) {
if(auto client = wclient.lock()) {
LOG_DEBUG(client.get(), "onSignalingStateChange: %d", (int)state);
}
});
pc->onStateChange([wclient](rtc::PeerConnection::State state) {
if(auto client = wclient.lock()) {
LOG_DEBUG(client.get(), "onStateChange: %d", (int)state);
if(state == rtc::PeerConnection::State::Connected) {
// Start streaming once the client is connected, to ensure a keyframe is sent to start the stream.
std::unique_lock lock(client->lock);
client->video->startStreaming();
} else if (state == rtc::PeerConnection::State::Disconnected ||
state == rtc::PeerConnection::State::Failed ||
state == rtc::PeerConnection::State::Closed)
{
webrtc_remove_client(client, "stream closed");
}
}
});
pc->onGatheringStateChange([wclient](rtc::PeerConnection::GatheringState state) {
if(auto client = wclient.lock()) {
LOG_DEBUG(client.get(), "onGatheringStateChange: %d", (int)state);
if (state == rtc::PeerConnection::GatheringState::Complete) {
client->wait_for_complete.notify_all();
}
}
});
std::unique_lock lk(webrtc_clients_lock);
webrtc_clients.insert(client);
return client;
}
static bool webrtc_h264_needs_buffer(buffer_lock_t *buf_lock)
{
std::unique_lock lk(webrtc_clients_lock);
for (auto client : webrtc_clients) {
if (client->wantsFrame())
return true;
}
return false;
}
static void webrtc_h264_capture(buffer_lock_t *buf_lock, buffer_t *buf)
{
std::unique_lock lk(webrtc_clients_lock);
for (auto client : webrtc_clients) {
if (client->wantsFrame())
client->pushFrame(buf);
}
}
static void http_webrtc_request(http_worker_t *worker, FILE *stream, const nlohmann::json &message)
{
auto client = webrtc_peer_connection(webrtc_configuration, message);
LOG_INFO(client.get(), "Stream requested.");
client->video = webrtc_add_video(client->pc, webrtc_client_video_payload_type, rand(), "video", "");
try {
{
std::unique_lock lock(client->lock);
client->pc->setLocalDescription();
client->wait_for_complete.wait_for(lock, webrtc_client_lock_timeout);
}
if (client->pc->gatheringState() == rtc::PeerConnection::GatheringState::Complete) {
auto description = client->pc->localDescription();
nlohmann::json message;
message["id"] = client->id;
message["type"] = description->typeString();
message["sdp"] = std::string(description.value());
client->describePeerConnection(message);
http_write_response(stream, "200 OK", "application/json", message.dump().c_str(), 0);
LOG_VERBOSE(client.get(), "Local SDP Offer: %s", std::string(message["sdp"]).c_str());
} else {
http_500(stream, "Not complete");
}
} catch(const std::exception &e) {
http_500(stream, e.what());
webrtc_remove_client(client, e.what());
}
}
static void http_webrtc_answer(http_worker_t *worker, FILE *stream, const nlohmann::json &message)
{
if (!message.contains("id") || !message.contains("sdp")) {
http_400(stream, "no sdp or id");
return;
}
if (auto client = webrtc_find_client(message["id"])) {
LOG_INFO(client.get(), "Answer received.");
LOG_VERBOSE(client.get(), "Remote SDP Answer: %s", std::string(message["sdp"]).c_str());
try {
auto answer = rtc::Description(std::string(message["sdp"]), std::string(message["type"]));
client->pc->setRemoteDescription(answer);
client->has_set_sdp_answer = true;
// If there are any pending candidates that make it in before the answer request, add them now.
for(auto const &candidate : client->pending_remote_candidates) {
client->pc->addRemoteCandidate(candidate);
}
client->pending_remote_candidates.clear();
http_write_response(stream, "200 OK", "application/json", "{}", 0);
} catch(const std::exception &e) {
http_500(stream, e.what());
webrtc_remove_client(client, e.what());
}
} else {
http_404(stream, "No client found");
}
}
static void http_webrtc_offer(http_worker_t *worker, FILE *stream, const nlohmann::json &message)
{
if (!message.contains("sdp")) {
http_400(stream, "no sdp");
return;
}
auto offer = rtc::Description(std::string(message["sdp"]), std::string(message["type"]));
auto client = webrtc_peer_connection(webrtc_configuration, message);
LOG_INFO(client.get(), "Offer received.");
LOG_VERBOSE(client.get(), "Remote SDP Offer: %s", std::string(message["sdp"]).c_str());
try {
client->video = webrtc_add_video(client->pc, webrtc_client_video_payload_type, rand(), "video", "");
{
std::unique_lock lock(client->lock);
client->pc->setRemoteDescription(offer);
client->has_set_sdp_answer = true;
client->pc->setLocalDescription();
client->wait_for_complete.wait_for(lock, webrtc_client_lock_timeout);
}
if (client->pc->gatheringState() == rtc::PeerConnection::GatheringState::Complete) {
auto description = client->pc->localDescription();
nlohmann::json message;
message["type"] = description->typeString();
message["sdp"] = std::string(description.value());
client->describePeerConnection(message);
http_write_response(stream, "200 OK", "application/json", message.dump().c_str(), 0);
LOG_VERBOSE(client.get(), "Local SDP Answer: %s", std::string(message["sdp"]).c_str());
} else {
http_500(stream, "Not complete");
}
} catch(const std::exception &e) {
http_500(stream, e.what());
webrtc_remove_client(client, e.what());
}
}
static void http_webrtc_remote_candidate(http_worker_t *worker, FILE *stream, const nlohmann::json &message)
{
if (!message.contains("candidates") || !message.contains("id") || !message["candidates"].is_array()) {
http_400(stream, "candidates is not array or no id");
return;
}
auto client = webrtc_find_client(message["id"]);
if (!client) {
http_404(stream, "No client found");
return;
}
for (auto const & entry : message["candidates"]) {
try {
auto remoteCandidate = rtc::Candidate(
entry["candidate"].get<std::string>(),
entry["sdpMid"].get<std::string>());
std::unique_lock lock(client->lock);
// The ICE candidate http requests can race the sdp answer http request and win. But it's invalid to set the ICE
// candidates before the SDP answer is set.
if (client->has_set_sdp_answer) {
client->pc->addRemoteCandidate(remoteCandidate);
} else {
client->pending_remote_candidates.push_back(remoteCandidate);
}
} catch (nlohmann::detail::exception &e) {
http_400(stream, e.what());
return;
}
}
http_write_response(stream, "200 OK", "application/json", "{}", 0);
}
static void *heartbeat_checker(void *)
{
uint64_t disconnect = 10;
while (true) {
{
uint64_t now_s = get_monotonic_time_us(NULL, NULL) / 1000 / 1000;
std::unique_lock lk(webrtc_clients_lock);
auto it = webrtc_clients.begin();
while (it != webrtc_clients.end()) {
auto client = *it;
if (!client) {
continue;
}
uint64_t heartbeat_detla = now_s - client->last_heartbeat_s;
if (heartbeat_detla >= disconnect) {
LOG_INFO(client.get(), "No heartbeat from client, removing.");
it = webrtc_clients.erase(it);
} else {
if (heartbeat_detla > disconnect / 2) {
LOG_DEBUG(client.get(), "Checking if client still alive.");
client->dc->send("ping");
}
it++;
}
}
}
sleep(1);
}
return NULL;
}
extern "C" void http_webrtc_offer(http_worker_t *worker, FILE *stream)
{
auto message = http_parse_json_body(worker, stream, webrtc_client_max_json_body);
if (!message.contains("type")) {
http_400(stream, "missing 'type'");
return;
}
std::string type = message["type"];
LOG_DEBUG(worker, "Recevied: '%s'", type.c_str());
if (type == "request") {
http_webrtc_request(worker, stream, message);
} else if (type == "answer") {
http_webrtc_answer(worker, stream, message);
} else if (type == "offer") {
http_webrtc_offer(worker, stream, message);
} else if (type == "remote_candidate") {
http_webrtc_remote_candidate(worker, stream, message);
} else {
http_400(stream, (std::string("Not expected: " + type)).c_str());
}
}
extern "C" int webrtc_server(webrtc_options_t *options)
{
webrtc_options = options;
for (const auto &ice_server : str_split(options->ice_servers, OPTION_VALUE_LIST_SEP_CHAR)) {
webrtc_configuration.iceServers.push_back(rtc::IceServer(ice_server));
}
buffer_lock_register_check_streaming(&video_lock, webrtc_h264_needs_buffer);
buffer_lock_register_notify_buffer(&video_lock, webrtc_h264_capture);
pthread_create(&heartbeat_thread, NULL, heartbeat_checker, NULL);
options->running = true;
return 0;
}
#else // USE_LIBDATACHANNEL
extern "C" void http_webrtc_offer(http_worker_t *worker, FILE *stream)
{
http_404(stream, NULL);
}
extern "C" int webrtc_server(webrtc_options_t *options)
{
return 0;
}
#endif // USE_LIBDATACHANNEL