diff --git a/CMakeLists.txt b/CMakeLists.txt index 86eb1e49..30ecbcf8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -76,6 +76,7 @@ set(FFI_PROTO_FILES ${FFI_PROTO_DIR}/e2ee.proto ${FFI_PROTO_DIR}/stats.proto ${FFI_PROTO_DIR}/data_stream.proto + ${FFI_PROTO_DIR}/data_track.proto ${FFI_PROTO_DIR}/rpc.proto ${FFI_PROTO_DIR}/track_publication.proto ) @@ -324,6 +325,7 @@ add_library(livekit SHARED src/audio_source.cpp src/audio_stream.cpp src/data_stream.cpp + src/data_track_subscription.cpp src/e2ee.cpp src/ffi_handle.cpp src/ffi_client.cpp @@ -331,7 +333,9 @@ add_library(livekit SHARED src/livekit.cpp src/logging.cpp src/local_audio_track.cpp + src/local_data_track.cpp src/remote_audio_track.cpp + src/remote_data_track.cpp src/room.cpp src/room_proto_converter.cpp src/room_proto_converter.h @@ -683,10 +687,6 @@ install(FILES # Build the LiveKit C++ bridge before examples (human_robot depends on it) add_subdirectory(bridge) -# ---- Examples ---- -# add_subdirectory(examples) - - if(LIVEKIT_BUILD_EXAMPLES) add_subdirectory(examples) endif() diff --git a/README.md b/README.md index eb473f26..b318e0d5 100644 --- a/README.md +++ b/README.md @@ -447,6 +447,35 @@ CPP SDK is using clang C++ format brew install clang-format ``` + +#### Memory Checks +Run valgrind on various examples or tests to check for memory leaks and other issues. +```bash +valgrind --leak-check=full ./build-debug/bin/livekit_integration_tests +valgrind --leak-check=full ./build-debug/bin/livekit_stress_tests +``` + +# Running locally +1. Install the livekit-server +https://docs.livekit.io/transport/self-hosting/local/ + +Start the livekit-server with data tracks enabled: +```bash +LIVEKIT_CONFIG="enable_data_tracks: true" livekit-server --dev +``` + +```bash +# generate tokens, do for all participants +lk token create \ + --api-key devkey \ + --api-secret secret \ + -i robot \ + --join \ + --valid-for 99999h \ + --room robo_room \ + --grant '{"canPublish":true,"canSubscribe":true,"canPublishData":true}' +``` +
diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 19a37781..da98cb33 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -43,6 +43,10 @@ set(EXAMPLES_ALL SimpleJoystickSender SimpleJoystickReceiver SimpleDataStream + SimpleStatusProducer + SimpleStatusConsumer + HelloLivekitSender + HelloLivekitReceiver LoggingLevelsBasicUsage LoggingLevelsCustomSinks BridgeRobot @@ -242,6 +246,58 @@ add_custom_command( $/data ) +# --- simple_status (producer + consumer text stream on producer-status) --- + +add_executable(SimpleStatusProducer + simple_status/producer.cpp +) + +target_include_directories(SimpleStatusProducer PRIVATE ${EXAMPLES_PRIVATE_INCLUDE_DIRS}) + +target_link_libraries(SimpleStatusProducer + PRIVATE + livekit + spdlog::spdlog +) + +add_executable(SimpleStatusConsumer + simple_status/consumer.cpp +) + +target_include_directories(SimpleStatusConsumer PRIVATE ${EXAMPLES_PRIVATE_INCLUDE_DIRS}) + +target_link_libraries(SimpleStatusConsumer + PRIVATE + livekit + spdlog::spdlog +) + +# --- hello_livekit (minimal synthetic video + data publish / subscribe) --- + +add_executable(HelloLivekitSender + hello_livekit/sender.cpp +) + +target_include_directories(HelloLivekitSender PRIVATE ${EXAMPLES_PRIVATE_INCLUDE_DIRS}) + +target_link_libraries(HelloLivekitSender + PRIVATE + livekit + spdlog::spdlog +) + +add_executable(HelloLivekitReceiver + hello_livekit/receiver.cpp +) + +target_include_directories(HelloLivekitReceiver PRIVATE ${EXAMPLES_PRIVATE_INCLUDE_DIRS}) + +target_link_libraries(HelloLivekitReceiver + PRIVATE + livekit + spdlog::spdlog +) + # --- bridge_human_robot examples (robot + human; use livekit_bridge and SDL3) --- add_executable(BridgeRobot diff --git a/examples/bridge_human_robot/human.cpp b/examples/bridge_human_robot/human.cpp index 81989eb5..3e8c553d 100644 --- a/examples/bridge_human_robot/human.cpp +++ b/examples/bridge_human_robot/human.cpp @@ -103,6 +103,11 @@ static void renderFrame(const livekit::VideoFrame &frame) { static std::atomic g_audio_frames{0}; static std::atomic g_video_frames{0}; +constexpr const char *kRobotMicTrackName = "robot-mic"; +constexpr const char *kRobotSimAudioTrackName = "robot-sim-audio"; +constexpr const char *kRobotCamTrackName = "robot-cam"; +constexpr const char *kRobotSimVideoTrackName = "robot-sim-frame"; + int main(int argc, char *argv[]) { // ----- Parse args / env ----- bool no_audio = false; @@ -232,7 +237,7 @@ int main(int argc, char *argv[]) { // ----- Set audio callbacks using Room::setOnAudioFrameCallback ----- room->setOnAudioFrameCallback( - "robot", livekit::TrackSource::SOURCE_MICROPHONE, + "robot", kRobotMicTrackName, [playAudio, no_audio](const livekit::AudioFrame &frame) { g_audio_frames.fetch_add(1, std::memory_order_relaxed); if (!no_audio && g_selected_source.load(std::memory_order_relaxed) == @@ -242,7 +247,7 @@ int main(int argc, char *argv[]) { }); room->setOnAudioFrameCallback( - "robot", livekit::TrackSource::SOURCE_SCREENSHARE_AUDIO, + "robot", kRobotSimAudioTrackName, [playAudio, no_audio](const livekit::AudioFrame &frame) { g_audio_frames.fetch_add(1, std::memory_order_relaxed); if (!no_audio && g_selected_source.load(std::memory_order_relaxed) == @@ -253,7 +258,7 @@ int main(int argc, char *argv[]) { // ----- Set video callbacks using Room::setOnVideoFrameCallback ----- room->setOnVideoFrameCallback( - "robot", livekit::TrackSource::SOURCE_CAMERA, + "robot", kRobotCamTrackName, [](const livekit::VideoFrame &frame, std::int64_t /*timestamp_us*/) { g_video_frames.fetch_add(1, std::memory_order_relaxed); if (g_selected_source.load(std::memory_order_relaxed) == @@ -263,7 +268,7 @@ int main(int argc, char *argv[]) { }); room->setOnVideoFrameCallback( - "robot", livekit::TrackSource::SOURCE_SCREENSHARE, + "robot", kRobotSimVideoTrackName, [](const livekit::VideoFrame &frame, std::int64_t /*timestamp_us*/) { g_video_frames.fetch_add(1, std::memory_order_relaxed); if (g_selected_source.load(std::memory_order_relaxed) == diff --git a/examples/hello_livekit/receiver.cpp b/examples/hello_livekit/receiver.cpp new file mode 100644 index 00000000..bc05e5f2 --- /dev/null +++ b/examples/hello_livekit/receiver.cpp @@ -0,0 +1,130 @@ +/* + * Copyright 2026 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/// Subscribes to the sender's camera video and data track. Run +/// HelloLivekitSender first; use the identity it prints, or the sender's known +/// participant name. +/// +/// Usage: +/// HelloLivekitReceiver +/// +/// Or via environment variables: +/// LIVEKIT_URL, LIVEKIT_RECEIVER_TOKEN, LIVEKIT_SENDER_IDENTITY + +#include "livekit/livekit.h" + +#include +#include +#include +#include +#include + +using namespace livekit; + +constexpr const char *kDataTrackName = "app-data"; +constexpr const char *kVideoTrackName = "camera0"; + +std::atomic g_running{true}; + +void handleSignal(int) { g_running.store(false); } + +std::string getenvOrEmpty(const char *name) { + const char *v = std::getenv(name); + return v ? std::string(v) : std::string{}; +} + +int main(int argc, char *argv[]) { + std::string url = getenvOrEmpty("LIVEKIT_URL"); + std::string receiver_token = getenvOrEmpty("LIVEKIT_RECEIVER_TOKEN"); + std::string sender_identity = getenvOrEmpty("LIVEKIT_SENDER_IDENTITY"); + + if (argc >= 4) { + url = argv[1]; + receiver_token = argv[2]; + sender_identity = argv[3]; + } + + if (url.empty() || receiver_token.empty() || sender_identity.empty()) { + LK_LOG_ERROR("Usage: HelloLivekitReceiver " + "\n" + " or set LIVEKIT_URL, LIVEKIT_RECEIVER_TOKEN, " + "LIVEKIT_SENDER_IDENTITY"); + return 1; + } + + std::signal(SIGINT, handleSignal); +#ifdef SIGTERM + std::signal(SIGTERM, handleSignal); +#endif + + livekit::initialize(livekit::LogLevel::Info, livekit::LogSink::kConsole); + + auto room = std::make_unique(); + RoomOptions options; + options.auto_subscribe = true; + options.dynacast = false; + + if (!room->Connect(url, receiver_token, options)) { + LK_LOG_ERROR("[receiver] Failed to connect"); + livekit::shutdown(); + return 1; + } + + LocalParticipant *lp = room->localParticipant(); + assert(lp); + + LK_LOG_INFO("[receiver] Connected as identity='{}' room='{}'; subscribing " + "to sender identity='{}'", + lp->identity(), room->room_info().name, sender_identity); + + int video_frame_count = 0; + room->setOnVideoFrameCallback( + sender_identity, kVideoTrackName, + [&video_frame_count](const VideoFrame &frame, std::int64_t timestamp_us) { + const auto ts_ms = + std::chrono::duration(timestamp_us).count(); + const int n = video_frame_count++; + if (n % 10 == 0) { + LK_LOG_INFO("[receiver] Video frame #{} {}x{} ts_ms={}", n, + frame.width(), frame.height(), ts_ms); + } + }); + + int data_frame_count = 0; + room->addOnDataFrameCallback( + sender_identity, kDataTrackName, + [&data_frame_count](const std::vector &payload, + std::optional user_ts) { + const int n = data_frame_count++; + if (n % 10 == 0) { + LK_LOG_INFO("[receiver] Data frame #{}", n); + } + }); + + LK_LOG_INFO("[receiver] Listening for video track '{}' + data track '{}'; " + "Ctrl-C to exit", + kVideoTrackName, kDataTrackName); + + while (g_running.load()) { + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + } + + LK_LOG_INFO("[receiver] Shutting down"); + room.reset(); + + livekit::shutdown(); + return 0; +} diff --git a/examples/hello_livekit/sender.cpp b/examples/hello_livekit/sender.cpp new file mode 100644 index 00000000..3ca70d5f --- /dev/null +++ b/examples/hello_livekit/sender.cpp @@ -0,0 +1,129 @@ +/* + * Copyright 2026 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/// Publishes synthetic RGBA video and a data track. Run the receiver in another +/// process and pass this participant's identity (printed after connect). +/// +/// Usage: +/// HelloLivekitSender +/// +/// Or via environment variables: +/// LIVEKIT_URL, LIVEKIT_SENDER_TOKEN + +#include "livekit/livekit.h" + +#include +#include +#include +#include +#include +#include +#include + +using namespace livekit; + +constexpr int kWidth = 640; +constexpr int kHeight = 480; +constexpr const char *kVideoTrackName = "camera0"; +constexpr const char *kDataTrackName = "app-data"; + +std::atomic g_running{true}; + +void handleSignal(int) { g_running.store(false); } + +std::string getenvOrEmpty(const char *name) { + const char *v = std::getenv(name); + return v ? std::string(v) : std::string{}; +} + +int main(int argc, char *argv[]) { + std::string url = getenvOrEmpty("LIVEKIT_URL"); + std::string sender_token = getenvOrEmpty("LIVEKIT_SENDER_TOKEN"); + + if (argc >= 3) { + url = argv[1]; + sender_token = argv[2]; + } + + if (url.empty() || sender_token.empty()) { + LK_LOG_ERROR( + "Usage: HelloLivekitSender \n" + " or set LIVEKIT_URL, LIVEKIT_SENDER_TOKEN"); + return 1; + } + + std::signal(SIGINT, handleSignal); +#ifdef SIGTERM + std::signal(SIGTERM, handleSignal); +#endif + + livekit::initialize(livekit::LogLevel::Info, livekit::LogSink::kConsole); + + auto room = std::make_unique(); + RoomOptions options; + options.auto_subscribe = true; + options.dynacast = false; + + if (!room->Connect(url, sender_token, options)) { + LK_LOG_ERROR("[sender] Failed to connect"); + livekit::shutdown(); + return 1; + } + + LocalParticipant *lp = room->localParticipant(); + assert(lp); + + LK_LOG_INFO("[sender] Connected as identity='{}' room='{}' — pass this " + "identity to HelloLivekitReceiver", + lp->identity(), room->room_info().name); + + auto video_source = std::make_shared(kWidth, kHeight); + + std::shared_ptr video_track = lp->publishVideoTrack( + kVideoTrackName, video_source, TrackSource::SOURCE_CAMERA); + + std::shared_ptr data_track = + lp->publishDataTrack(kDataTrackName); + + const auto t0 = std::chrono::steady_clock::now(); + std::uint64_t count = 0; + + LK_LOG_INFO( + "[sender] Publishing synthetic video + data on '{}'; Ctrl-C to exit", + kDataTrackName); + + while (g_running.load()) { + VideoFrame vf = VideoFrame::create(kWidth, kHeight, VideoBufferType::RGBA); + video_source->captureFrame(std::move(vf)); + + const auto now = std::chrono::steady_clock::now(); + const double ms = + std::chrono::duration(now - t0).count(); + std::ostringstream oss; + oss << std::fixed << std::setprecision(2) << ms << " ms, count: " << count; + const std::string msg = oss.str(); + data_track->tryPush(std::vector(msg.begin(), msg.end())); + + ++count; + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + + LK_LOG_INFO("[sender] Disconnecting"); + room.reset(); + + livekit::shutdown(); + return 0; +} diff --git a/examples/simple_status/consumer.cpp b/examples/simple_status/consumer.cpp new file mode 100644 index 00000000..2859e8b9 --- /dev/null +++ b/examples/simple_status/consumer.cpp @@ -0,0 +1,127 @@ +/* + * Copyright 2026 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/// Consumer participant: creates 3 independent data track subscriptions to the +/// producer's "status" data track and logs each frame with the subscriber +/// index. Use a token whose identity is `consumer`. + +#include "livekit/livekit.h" +#include "livekit/lk_log.h" + +#include +#include +#include +#include +#include +#include +#include + +using namespace livekit; + +namespace { + +constexpr const char *kProducerIdentity = "producer"; +constexpr const char *kTrackName = "status"; +constexpr int kNumSubscribers = 3; + +std::atomic g_running{true}; + +void handleSignal(int) { g_running.store(false); } + +std::string getenvOrEmpty(const char *name) { + const char *v = std::getenv(name); + return v ? std::string(v) : std::string{}; +} + +} // namespace + +int main(int argc, char *argv[]) { + std::string url = getenvOrEmpty("LIVEKIT_URL"); + std::string token = getenvOrEmpty("LIVEKIT_TOKEN"); + + if (argc >= 3) { + url = argv[1]; + token = argv[2]; + } + + if (url.empty() || token.empty()) { + LK_LOG_ERROR("LIVEKIT_URL and LIVEKIT_TOKEN (or ) are " + "required"); + return 1; + } + + std::signal(SIGINT, handleSignal); +#ifdef SIGTERM + std::signal(SIGTERM, handleSignal); +#endif + + livekit::initialize(livekit::LogLevel::Info, livekit::LogSink::kConsole); + + auto room = std::make_unique(); + RoomOptions options; + options.auto_subscribe = true; + options.dynacast = false; + + if (!room->Connect(url, token, options)) { + LK_LOG_ERROR("Failed to connect to room"); + livekit::shutdown(); + return 1; + } + + LocalParticipant *lp = room->localParticipant(); + if (!lp) { + LK_LOG_ERROR("No local participant after connect"); + room->setDelegate(nullptr); + room.reset(); + livekit::shutdown(); + return 1; + } + + LK_LOG_INFO("consumer connected as identity='{}' room='{}'", lp->identity(), + room->room_info().name); + + std::vector sub_ids; + sub_ids.reserve(kNumSubscribers); + + for (int i = 0; i < kNumSubscribers; ++i) { + auto id = room->addOnDataFrameCallback( + kProducerIdentity, kTrackName, + [i](const std::vector &payload, + std::optional /*user_timestamp*/) { + std::string text(payload.begin(), payload.end()); + LK_LOG_INFO("[subscriber {}] {}", i, text); + }); + sub_ids.push_back(id); + LK_LOG_INFO("registered subscriber {} (id={})", i, id); + } + + LK_LOG_INFO("listening for data track '{}' from '{}' with {} subscribers; " + "Ctrl-C to exit", + kTrackName, kProducerIdentity, kNumSubscribers); + + while (g_running.load()) { + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + } + + LK_LOG_INFO("shutting down"); + for (auto id : sub_ids) { + room->removeOnDataFrameCallback(id); + } + room->setDelegate(nullptr); + room.reset(); + livekit::shutdown(); + return 0; +} diff --git a/examples/simple_status/producer.cpp b/examples/simple_status/producer.cpp new file mode 100644 index 00000000..fa7e882c --- /dev/null +++ b/examples/simple_status/producer.cpp @@ -0,0 +1,143 @@ +/* + * Copyright 2026 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/// Producer participant: publishes a data track named "status" and pushes +/// periodic binary status frames (4 Hz). Use a token whose identity is +/// `producer`. + +#include "livekit/livekit.h" +#include "livekit/lk_log.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace livekit; + +namespace { + +constexpr const char *kTrackName = "status"; + +std::atomic g_running{true}; + +void handleSignal(int) { g_running.store(false); } + +std::string getenvOrEmpty(const char *name) { + const char *v = std::getenv(name); + return v ? std::string(v) : std::string{}; +} + +} // namespace + +int main(int argc, char *argv[]) { + std::string url = getenvOrEmpty("LIVEKIT_URL"); + std::string token = getenvOrEmpty("LIVEKIT_TOKEN"); + + if (argc >= 3) { + url = argv[1]; + token = argv[2]; + } + + if (url.empty() || token.empty()) { + LK_LOG_ERROR("LIVEKIT_URL and LIVEKIT_TOKEN (or ) are " + "required"); + return 1; + } + + std::signal(SIGINT, handleSignal); +#ifdef SIGTERM + std::signal(SIGTERM, handleSignal); +#endif + + livekit::initialize(livekit::LogLevel::Info, livekit::LogSink::kConsole); + + auto room = std::make_unique(); + RoomOptions options; + options.auto_subscribe = true; + options.dynacast = false; + + if (!room->Connect(url, token, options)) { + LK_LOG_ERROR("Failed to connect to room"); + livekit::shutdown(); + return 1; + } + + LocalParticipant *lp = room->localParticipant(); + if (!lp) { + LK_LOG_ERROR("No local participant after connect"); + room->setDelegate(nullptr); + room.reset(); + livekit::shutdown(); + return 1; + } + + LK_LOG_INFO("producer connected as identity='{}' room='{}'", lp->identity(), + room->room_info().name); + + std::shared_ptr data_track; + try { + data_track = lp->publishDataTrack(kTrackName); + } catch (const std::exception &e) { + LK_LOG_ERROR("Failed to publish data track: {}", e.what()); + room->setDelegate(nullptr); + room.reset(); + livekit::shutdown(); + return 1; + } + + LK_LOG_INFO("published data track '{}'", kTrackName); + + using clock = std::chrono::steady_clock; + const auto start = clock::now(); + const auto period = std::chrono::milliseconds(250); + auto next_deadline = clock::now(); + std::uint64_t count = 0; + + while (g_running.load()) { + const auto now = clock::now(); + const double elapsed_sec = + std::chrono::duration(now - start).count(); + + std::ostringstream body; + body << std::fixed << std::setprecision(2) << elapsed_sec; + const std::string text = std::string("[time-since-start]: ") + body.str() + + " count: " + std::to_string(count); + + std::vector payload(text.begin(), text.end()); + if (!data_track->tryPush(payload)) { + LK_LOG_WARN("Failed to push data frame"); + } + + LK_LOG_DEBUG("sent: {}", text); + ++count; + + next_deadline += period; + std::this_thread::sleep_until(next_deadline); + } + + LK_LOG_INFO("shutting down"); + data_track->unpublishDataTrack(); + room->setDelegate(nullptr); + room.reset(); + livekit::shutdown(); + return 0; +} diff --git a/examples/tokens/README.md b/examples/tokens/README.md new file mode 100644 index 00000000..ebed99c1 --- /dev/null +++ b/examples/tokens/README.md @@ -0,0 +1,8 @@ +# Overview +Examples of generating tokens + +## gen_and_set.bash +Generate tokens and then set them as env vars for the current terminal session + +## set_data_track_test_tokens.bash +Generate tokens for data track integration tests and set them as env vars for the current terminal session. \ No newline at end of file diff --git a/examples/tokens/gen_and_set.bash b/examples/tokens/gen_and_set.bash new file mode 100755 index 00000000..b933a24f --- /dev/null +++ b/examples/tokens/gen_and_set.bash @@ -0,0 +1,169 @@ +#!/usr/bin/env bash +# Copyright 2026 LiveKit, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Generate a LiveKit access token via `lk` and set LIVEKIT_TOKEN (and LIVEKIT_URL) +# for your current shell session. +# +# source examples/tokens/gen_and_set.bash --id PARTICIPANT_ID --room ROOM_NAME [--view-token] +# eval "$(bash examples/tokens/gen_and_set.bash --id ID --room ROOM [--view-token])" +# +# Optional env: LIVEKIT_API_KEY, LIVEKIT_API_SECRET, LIVEKIT_VALID_FOR. + +# When sourced, we must NOT enable errexit/pipefail on the interactive shell — a +# failing pipeline (e.g. sed|head SIGPIPE) or any error would close your terminal. + +_sourced=0 +if [[ -n "${BASH_VERSION:-}" ]] && [[ "${BASH_SOURCE[0]}" != "${0}" ]]; then + _sourced=1 +elif [[ -n "${ZSH_VERSION:-}" ]] && [[ "${ZSH_EVAL_CONTEXT:-}" == *:file* ]]; then + _sourced=1 +fi + +_fail() { + echo "gen_and_set.bash: $1" >&2 + if [[ "$_sourced" -eq 1 ]]; then + return "${2:-1}" + fi + exit "${2:-1}" +} + +_usage() { + echo "Usage: ${0##*/} --id PARTICIPANT_IDENTITY --room ROOM_NAME [--view-token]" >&2 + echo " --id LiveKit participant identity (required)" >&2 + echo " --room Room name (required; not read from env)" >&2 + echo " --view-token Print the JWT to stderr after generating" >&2 +} + +if [[ "$_sourced" -eq 0 ]]; then + set -euo pipefail +fi + +_view_token=0 +LIVEKIT_IDENTITY="" +LIVEKIT_ROOM="robo_room" +while [[ $# -gt 0 ]]; do + case "$1" in + --view-token) + _view_token=1 + shift + ;; + --id) + if [[ $# -lt 2 ]]; then + _usage + _fail "--id requires a value" 2 + fi + LIVEKIT_IDENTITY="$2" + shift 2 + ;; + --room) + if [[ $# -lt 2 ]]; then + _usage + _fail "--room requires a value" 2 + fi + LIVEKIT_ROOM="$2" + shift 2 + ;; + -h | --help) + _usage + if [[ "$_sourced" -eq 1 ]]; then + return 0 + fi + exit 0 + ;; + *) + _usage + _fail "unknown argument: $1" 2 + ;; + esac +done + +if [[ -z "$LIVEKIT_IDENTITY" ]]; then + _usage + _fail "--id is required" 2 +fi +if [[ -z "$LIVEKIT_ROOM" ]]; then + _usage + _fail "--room is required" 2 +fi + +LIVEKIT_API_KEY="${LIVEKIT_API_KEY:-devkey}" +LIVEKIT_API_SECRET="${LIVEKIT_API_SECRET:-secret}" +LIVEKIT_VALID_FOR="${LIVEKIT_VALID_FOR:-99999h}" +_grant_json='{"canPublish":true,"canSubscribe":true,"canPublishData":true}' + +if ! command -v lk >/dev/null 2>&1; then + _fail "'lk' CLI not found. Install: https://docs.livekit.io/home/cli/" 2 +fi + +# Run lk inside bash so --grant JSON (with embedded ") is safe when this file is +# sourced from zsh; zsh misparses --grant "$json" on the same line. +_out="$( + bash -c ' + lk token create \ + --api-key "$1" \ + --api-secret "$2" \ + -i "$3" \ + --join \ + --valid-for "$4" \ + --room "$5" \ + --grant "$6" 2>&1 + ' _ "$LIVEKIT_API_KEY" "$LIVEKIT_API_SECRET" "$LIVEKIT_IDENTITY" \ + "$LIVEKIT_VALID_FOR" "$LIVEKIT_ROOM" "$_grant_json" +)" +_lk_st=$? +if [[ "$_lk_st" -ne 0 ]]; then + echo "$_out" >&2 + _fail "lk token create failed" 1 +fi + +# Avoid sed|head pipelines (pipefail + SIGPIPE can kill a sourced shell). +LIVEKIT_TOKEN="" +LIVEKIT_URL="" +while IFS= read -r _line || [[ -n "${_line}" ]]; do + if [[ "$_line" == "Access token: "* ]]; then + LIVEKIT_TOKEN="${_line#Access token: }" + elif [[ "$_line" == "Project URL: "* ]]; then + LIVEKIT_URL="${_line#Project URL: }" + fi +done <<< "$_out" + +if [[ -z "$LIVEKIT_TOKEN" ]]; then + echo "gen_and_set.bash: could not parse Access token from lk output:" >&2 + echo "$_out" >&2 + _fail "missing Access token line" 1 +fi + +if [[ "$_view_token" -eq 1 ]]; then + echo "$LIVEKIT_TOKEN" >&2 +fi + +_apply() { + export LIVEKIT_TOKEN + export LIVEKIT_URL +} + +_emit_eval() { + printf 'export LIVEKIT_TOKEN=%q\n' "$LIVEKIT_TOKEN" + [[ -n "$LIVEKIT_URL" ]] && printf 'export LIVEKIT_URL=%q\n' "$LIVEKIT_URL" +} + +if [[ "$_sourced" -eq 1 ]]; then + _apply + echo "LIVEKIT_TOKEN and LIVEKIT_URL set for this shell." >&2 + [[ -n "$LIVEKIT_URL" ]] || echo "gen_and_set.bash: warning: no Project URL in output; set LIVEKIT_URL manually." >&2 +else + _emit_eval + echo "gen_and_set.bash: for this shell run: source $0 --id ... --room ... or: eval \"\$(bash $0 ...)\"" >&2 +fi diff --git a/examples/tokens/set_data_track_test_tokens.bash b/examples/tokens/set_data_track_test_tokens.bash new file mode 100755 index 00000000..1cc8bb56 --- /dev/null +++ b/examples/tokens/set_data_track_test_tokens.bash @@ -0,0 +1,126 @@ +#!/usr/bin/env bash +# Copyright 2026 LiveKit, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Generate two LiveKit access tokens via `lk` and set the environment variables +# required by src/tests/integration/test_data_track.cpp. +# +# source examples/tokens/set_data_track_test_tokens.bash +# eval "$(bash examples/tokens/set_data_track_test_tokens.bash)" +# +# Exports: +# LK_TOKEN_TEST_A +# LK_TOKEN_TEST_B +# LIVEKIT_URL=ws://localhost:7880 +# + +_sourced=0 +if [[ -n "${BASH_VERSION:-}" ]] && [[ "${BASH_SOURCE[0]}" != "${0}" ]]; then + _sourced=1 +elif [[ -n "${ZSH_VERSION:-}" ]] && [[ "${ZSH_EVAL_CONTEXT:-}" == *:file* ]]; then + _sourced=1 +fi + +_fail() { + echo "set_data_track_test_tokens.bash: $1" >&2 + if [[ "$_sourced" -eq 1 ]]; then + return "${2:-1}" + fi + exit "${2:-1}" +} + +if [[ "$_sourced" -eq 0 ]]; then + set -euo pipefail +fi + +LIVEKIT_ROOM="cpp_data_track_test" +LIVEKIT_IDENTITY_A="cpp-test-a" +LIVEKIT_IDENTITY_B="cpp-test-b" + +if [[ $# -ne 0 ]]; then + _fail "this script is hard-coded and does not accept arguments" 2 +fi + +LIVEKIT_API_KEY="devkey" +LIVEKIT_API_SECRET="secret" +LIVEKIT_VALID_FOR="99999h" +LIVEKIT_URL="ws://localhost:7880" +_grant_json='{"canPublish":true,"canSubscribe":true,"canPublishData":true}' + +if ! command -v lk >/dev/null 2>&1; then + _fail "'lk' CLI not found. Install: https://docs.livekit.io/home/cli/" 2 +fi + +_create_token() { + local identity="$1" + local output="" + local command_status=0 + local token="" + + output="$( + bash -c ' + lk token create \ + --api-key "$1" \ + --api-secret "$2" \ + -i "$3" \ + --join \ + --valid-for "$4" \ + --room "$5" \ + --grant "$6" 2>&1 + ' _ "$LIVEKIT_API_KEY" "$LIVEKIT_API_SECRET" "$identity" \ + "$LIVEKIT_VALID_FOR" "$LIVEKIT_ROOM" "$_grant_json" + )" + command_status=$? + if [[ "$command_status" -ne 0 ]]; then + echo "$output" >&2 + _fail "lk token create failed for identity '$identity'" 1 + fi + + while IFS= read -r line || [[ -n "${line}" ]]; do + if [[ "$line" == "Access token: "* ]]; then + token="${line#Access token: }" + break + fi + done <<< "$output" + + if [[ -z "$token" ]]; then + echo "$output" >&2 + _fail "could not parse Access token for identity '$identity'" 1 + fi + + printf '%s' "$token" +} + +LK_TOKEN_TEST_A="$(_create_token "$LIVEKIT_IDENTITY_A")" +LK_TOKEN_TEST_B="$(_create_token "$LIVEKIT_IDENTITY_B")" + +_apply() { + export LK_TOKEN_TEST_A + export LK_TOKEN_TEST_B + export LIVEKIT_URL +} + +_emit_eval() { + printf 'export LK_TOKEN_TEST_A=%q\n' "$LK_TOKEN_TEST_A" + printf 'export LK_TOKEN_TEST_B=%q\n' "$LK_TOKEN_TEST_B" + printf 'export LIVEKIT_URL=%q\n' "$LIVEKIT_URL" +} + +if [[ "$_sourced" -eq 1 ]]; then + _apply + echo "LK_TOKEN_TEST_A, LK_TOKEN_TEST_B, and LIVEKIT_URL set for this shell." >&2 +else + _emit_eval + echo "set_data_track_test_tokens.bash: for this shell run: source $0 or: eval \"\$(bash $0 ...)\"" >&2 +fi diff --git a/include/livekit/data_frame.h b/include/livekit/data_frame.h new file mode 100644 index 00000000..6bfa5e0a --- /dev/null +++ b/include/livekit/data_frame.h @@ -0,0 +1,45 @@ +/* + * Copyright 2026 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +namespace livekit { + +/** + * A single frame of data published or received on a data track. + * + * Carries an arbitrary binary payload and an optional user-specified + * timestamp. The unit is application-defined; the SDK examples use + * microseconds since the Unix epoch (system_clock). + */ +struct DataFrame { + /** Arbitrary binary payload (the frame contents). */ + std::vector payload; + + /** + * Optional application-defined timestamp. + * + * The proto field is a bare uint64 with no prescribed unit. + * By convention the SDK examples use microseconds since the Unix epoch. + */ + std::optional user_timestamp; +}; + +} // namespace livekit diff --git a/include/livekit/data_track_frame.h b/include/livekit/data_track_frame.h new file mode 100644 index 00000000..6bfa5e0a --- /dev/null +++ b/include/livekit/data_track_frame.h @@ -0,0 +1,45 @@ +/* + * Copyright 2026 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +namespace livekit { + +/** + * A single frame of data published or received on a data track. + * + * Carries an arbitrary binary payload and an optional user-specified + * timestamp. The unit is application-defined; the SDK examples use + * microseconds since the Unix epoch (system_clock). + */ +struct DataFrame { + /** Arbitrary binary payload (the frame contents). */ + std::vector payload; + + /** + * Optional application-defined timestamp. + * + * The proto field is a bare uint64 with no prescribed unit. + * By convention the SDK examples use microseconds since the Unix epoch. + */ + std::optional user_timestamp; +}; + +} // namespace livekit diff --git a/include/livekit/data_track_info.h b/include/livekit/data_track_info.h new file mode 100644 index 00000000..45c4fc5f --- /dev/null +++ b/include/livekit/data_track_info.h @@ -0,0 +1,40 @@ +/* + * Copyright 2026 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +namespace livekit { + +/** + * Metadata about a published data track. + * + * Unlike audio/video tracks, data tracks are not part of the Track class + * hierarchy. They carry their own lightweight info struct. + */ +struct DataTrackInfo { + /** Publisher-assigned track name (unique per publisher). */ + std::string name; + + /** SFU-assigned track identifier. */ + std::string sid; + + /** Whether frames on this track use end-to-end encryption. */ + bool uses_e2ee = false; +}; + +} // namespace livekit diff --git a/include/livekit/data_track_subscription.h b/include/livekit/data_track_subscription.h new file mode 100644 index 00000000..4f3619f3 --- /dev/null +++ b/include/livekit/data_track_subscription.h @@ -0,0 +1,123 @@ +/* + * Copyright 2026 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "livekit/data_frame.h" +#include "livekit/ffi_handle.h" + +#include +#include +#include +#include +#include +#include + +namespace livekit { + +namespace proto { +class FfiEvent; +} + +/** + * An active subscription to a remote data track. + * + * Provides a blocking read() interface similar to AudioStream / VideoStream. + * Frames are delivered via FfiEvent callbacks and stored internally. + * + * Dropping (destroying) the subscription automatically unsubscribes from the + * remote track by releasing the underlying FFI handle. + * + * Typical usage: + * + * auto sub = remoteDataTrack->subscribe(); + * DataFrame frame; + * while (sub->read(frame)) { + * // process frame.payload + * } + */ +class DataTrackSubscription { +public: + struct Options { + /// Maximum frames buffered on the Rust side. Rust defaults to 16. + std::optional buffer_size{std::nullopt}; + }; + + virtual ~DataTrackSubscription(); + + DataTrackSubscription(const DataTrackSubscription &) = delete; + DataTrackSubscription &operator=(const DataTrackSubscription &) = delete; + DataTrackSubscription(DataTrackSubscription &&) noexcept; + DataTrackSubscription &operator=(DataTrackSubscription &&) noexcept; + + /** + * Blocking read: waits until a DataFrame is available, or the + * subscription reaches EOS / is closed. + * + * @param out On success, filled with the next data frame. + * @return true if a frame was delivered; false if the subscription ended. + */ + bool read(DataFrame &out); + + /** + * End the subscription early. + * + * Releases the FFI handle (which unsubscribes from the remote track), + * unregisters the event listener, and wakes any blocking read(). + */ + void close(); + +private: + friend class RemoteDataTrack; + + DataTrackSubscription() = default; + /// Internal init helper, called by RemoteDataTrack. + void init(FfiHandle subscription_handle); + + /// FFI event handler, called by FfiClient. + void onFfiEvent(const proto::FfiEvent &event); + + /// Push a received DataFrame to the internal storage. + void pushFrame(DataFrame &&frame); + + /// Push an end-of-stream signal (EOS). + void pushEos(); + + /** Protects all mutable state below. */ + mutable std::mutex mutex_; + + /** Signalled when a frame is pushed or the subscription ends. */ + std::condition_variable cv_; + + /** Received frame awaiting read(). + NOTE: the rust side handles buffering, so we should only really ever have one + item*/ + std::optional frame_; + + /** True once the remote side signals end-of-stream. */ + bool eof_{false}; + + /** True after close() has been called by the consumer. */ + bool closed_{false}; + + /** RAII handle for the Rust-owned subscription resource. */ + FfiHandle subscription_handle_; + + /** FfiClient listener id for routing FfiEvent callbacks to this object. */ + std::int64_t listener_id_{0}; +}; + +} // namespace livekit diff --git a/include/livekit/local_data_track.h b/include/livekit/local_data_track.h new file mode 100644 index 00000000..2375d728 --- /dev/null +++ b/include/livekit/local_data_track.h @@ -0,0 +1,112 @@ +/* + * Copyright 2026 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "livekit/data_frame.h" +#include "livekit/data_track_info.h" +#include "livekit/ffi_handle.h" + +#include +#include +#include +#include +#include + +namespace livekit { + +namespace proto { +class OwnedLocalDataTrack; +} + +/** + * Represents a locally published data track. + * + * Unlike audio/video tracks, data tracks do not extend the Track base class. + * They use a separate publish/unpublish lifecycle and carry arbitrary binary + * frames instead of media. + * + * Created via LocalParticipant::publishDataTrack(). + * + * Typical usage: + * + * auto lp = room->localParticipant(); + * auto dt = lp->publishDataTrack("sensor-data"); + * DataFrame frame; + * frame.payload = {0x01, 0x02, 0x03}; + * dt->tryPush(frame); + * dt->unpublishDataTrack(); + */ +class LocalDataTrack { +public: + ~LocalDataTrack() = default; + + LocalDataTrack(const LocalDataTrack &) = delete; + LocalDataTrack &operator=(const LocalDataTrack &) = delete; + + /// Metadata about this data track. + const DataTrackInfo &info() const noexcept { return info_; } + + /** + * Try to push a frame to all subscribers of this track. + * + * @return true on success, false if the push failed (e.g. back-pressure + * or the track has been unpublished). + */ + bool tryPush(const DataFrame &frame); + + /** + * Try to push a frame to all subscribers of this track. + * + * @return true on success, false if the push failed (e.g. back-pressure + * or the track has been unpublished). + */ + bool tryPush(const std::vector &payload, + std::optional user_timestamp = std::nullopt); + /** + * Try to push a frame to all subscribers of this track. + * + * @return true on success, false if the push failed (e.g. back-pressure + * or the track has been unpublished). + */ + bool tryPush(const std::uint8_t *data, std::size_t size, + std::optional user_timestamp = std::nullopt); + + /// Whether the track is still published in the room. + bool isPublished() const; + + /** + * Unpublish this data track from the room. + * + * After this call, tryPush() fails and the track cannot be re-published. + */ + void unpublishDataTrack(); + +private: + friend class LocalParticipant; + + explicit LocalDataTrack(const proto::OwnedLocalDataTrack &owned); + + uintptr_t ffi_handle_id() const noexcept { return handle_.get(); } + + /** RAII wrapper for the Rust-owned FFI resource. */ + FfiHandle handle_; + + /** Metadata snapshot taken at construction time. */ + DataTrackInfo info_; +}; + +} // namespace livekit diff --git a/include/livekit/local_participant.h b/include/livekit/local_participant.h index edd7c945..fed3a190 100644 --- a/include/livekit/local_participant.h +++ b/include/livekit/local_participant.h @@ -18,6 +18,7 @@ #include "livekit/ffi_handle.h" #include "livekit/local_audio_track.h" +#include "livekit/local_data_track.h" #include "livekit/local_video_track.h" #include "livekit/participant.h" #include "livekit/room_event_types.h" @@ -101,7 +102,13 @@ class LocalParticipant : public Participant { const std::string &topic = {}); /** - * Publish SIP DTMF message. + * Publish a SIP DTMF (phone keypad) tone into the room. + * + * Only meaningful when a SIP trunk is bridging a phone call into the + * room. See SipDtmfData for background on SIP and DTMF. + * + * @param code DTMF code (0-15). + * @param digit Human-readable digit string (e.g. "5", "#"). */ void publishDtmf(int code, const std::string &digit); @@ -164,6 +171,31 @@ class LocalParticipant : public Participant { */ void unpublishTrack(const std::string &track_sid); + /** + * Publish a data track to the room. + * + * Data tracks carry arbitrary binary frames and are independent of the + * audio/video track hierarchy. The returned LocalDataTrack can push + * frames via tryPush() and be unpublished via + * LocalDataTrack::unpublishDataTrack() or + * LocalParticipant::unpublishDataTrack(). + * + * @param name Unique track name visible to other participants. + * @return Shared pointer to the published data track. + * @throws std::runtime_error on FFI or publish failure. + */ + std::shared_ptr publishDataTrack(const std::string &name); + + /** + * Unpublish a data track from the room. + * + * Delegates to LocalDataTrack::unpublishDataTrack(). After this call, + * tryPush() on the track will fail and the track cannot be re-published. + * + * @param track The data track to unpublish. Null is ignored. + */ + void unpublishDataTrack(const std::shared_ptr &track); + /** * Initiate an RPC call to a remote participant. * @@ -244,6 +276,7 @@ class LocalParticipant : public Participant { /// cached publication). \c mutable so \ref trackPublications() const can /// prune expired \c weak_ptr entries. mutable TrackMap published_tracks_by_sid_; + std::unordered_map rpc_handlers_; // Shared state for RPC invocation tracking. Using shared_ptr so the state diff --git a/include/livekit/remote_data_track.h b/include/livekit/remote_data_track.h new file mode 100644 index 00000000..ce755a06 --- /dev/null +++ b/include/livekit/remote_data_track.h @@ -0,0 +1,93 @@ +/* + * Copyright 2026 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "livekit/data_track_info.h" +#include "livekit/data_track_subscription.h" +#include "livekit/ffi_handle.h" + +#include +#include + +namespace livekit { + +namespace proto { +class OwnedRemoteDataTrack; +} + +/** + * Represents a data track published by a remote participant. + * + * Discovered via the DataTrackPublishedEvent room event. Unlike + * audio/video tracks, remote data tracks require an explicit subscribe() + * call to begin receiving frames. + * + * Typical usage: + * + * // In RoomDelegate::onDataTrackPublished callback: + * auto sub = remoteDataTrack->subscribe(); + * DataFrame frame; + * while (sub->read(frame)) { + * // process frame + * } + */ +class RemoteDataTrack { +public: + ~RemoteDataTrack() = default; + + RemoteDataTrack(const RemoteDataTrack &) = delete; + RemoteDataTrack &operator=(const RemoteDataTrack &) = delete; + + /// Metadata about this data track. + const DataTrackInfo &info() const noexcept { return info_; } + + /// Identity of the remote participant who published this track. + const std::string &publisherIdentity() const noexcept { + return publisher_identity_; + } + + /// Whether the track is still published by the remote participant. + bool isPublished() const; + + /** + * Subscribe to this remote data track. + * + * Returns a DataTrackSubscription that delivers frames via blocking + * read(). Destroy the subscription to unsubscribe. + * + * @throws std::runtime_error if the FFI subscribe call fails. + */ + std::shared_ptr + subscribe(const DataTrackSubscription::Options &options = {}); + +private: + friend class Room; + + explicit RemoteDataTrack(const proto::OwnedRemoteDataTrack &owned); + + uintptr_t ffi_handle_id() const noexcept { return handle_.get(); } + /** RAII wrapper for the Rust-owned FFI resource. */ + FfiHandle handle_; + + /** Metadata snapshot taken at construction time. */ + DataTrackInfo info_; + + /** Identity string of the remote participant who published this track. */ + std::string publisher_identity_; +}; + +} // namespace livekit diff --git a/include/livekit/room.h b/include/livekit/room.h index d808ecd4..c8e501d3 100644 --- a/include/livekit/room.h +++ b/include/livekit/room.h @@ -241,62 +241,74 @@ class Room { // --------------------------------------------------------------- /** - * Set a callback for audio frames from a specific remote participant and - * track source. - * - * A dedicated reader thread is spawned for each (participant, source) pair - * when the track is subscribed. If the track is already subscribed, the - * reader starts immediately. If not, it starts when the track arrives. - * - * Only one callback may exist per (participant, source) pair. Re-calling - * with the same pair replaces the previous callback. - * - * @param participant_identity Identity of the remote participant. - * @param source Track source (e.g. SOURCE_MICROPHONE). - * @param callback Function invoked per audio frame. - * @param opts AudioStream options (capacity, noise - * cancellation). + * @brief Sets the audio frame callback via SubscriptionThreadDispatcher. */ void setOnAudioFrameCallback(const std::string &participant_identity, TrackSource source, AudioFrameCallback callback, AudioStream::Options opts = {}); /** - * Set a callback for video frames from a specific remote participant and - * track source. - * - * @see setOnAudioFrameCallback for threading and lifecycle semantics. - * - * @param participant_identity Identity of the remote participant. - * @param source Track source (e.g. SOURCE_CAMERA). - * @param callback Function invoked per video frame. - * @param opts VideoStream options (capacity, pixel format). + * @brief Sets the audio frame callback via SubscriptionThreadDispatcher. + */ + void setOnAudioFrameCallback(const std::string &participant_identity, + const std::string &track_name, + AudioFrameCallback callback, + AudioStream::Options opts = {}); + + /** + * @brief Sets the video frame callback via SubscriptionThreadDispatcher. */ void setOnVideoFrameCallback(const std::string &participant_identity, TrackSource source, VideoFrameCallback callback, VideoStream::Options opts = {}); /** - * Clear the audio frame callback for a specific (participant, source) pair. - * Stops and joins any active reader thread. - * No-op if no callback is registered for this key. - * @param participant_identity Identity of the remote participant. - * @param source Track source (e.g. SOURCE_MICROPHONE). + * @brief Sets the video frame callback via SubscriptionThreadDispatcher. + */ + void setOnVideoFrameCallback(const std::string &participant_identity, + const std::string &track_name, + VideoFrameCallback callback, + VideoStream::Options opts = {}); + + /** + * @brief Clears the audio frame callback via SubscriptionThreadDispatcher. */ void clearOnAudioFrameCallback(const std::string &participant_identity, TrackSource source); + /** + * @brief Clears the audio frame callback via SubscriptionThreadDispatcher. + */ + void clearOnAudioFrameCallback(const std::string &participant_identity, + const std::string &track_name); /** - * Clear the video frame callback for a specific (participant, source) pair. - * Stops and joins any active reader thread. - * No-op if no callback is registered for this key. - * @param participant_identity Identity of the remote participant. - * @param source Track source (e.g. SOURCE_CAMERA). + * @brief Clears the video frame callback via SubscriptionThreadDispatcher. */ void clearOnVideoFrameCallback(const std::string &participant_identity, TrackSource source); + /** + * @brief Clears the video frame callback via SubscriptionThreadDispatcher. + */ + void clearOnVideoFrameCallback(const std::string &participant_identity, + const std::string &track_name); + + /** + * @brief Adds a data frame callback via SubscriptionThreadDispatcher. + */ + DataFrameCallbackId + addOnDataFrameCallback(const std::string &participant_identity, + const std::string &track_name, + DataFrameCallback callback); + + /** + * @brief Removes the data frame callback via SubscriptionThreadDispatcher. + */ + void removeOnDataFrameCallback(DataFrameCallbackId id); + private: + friend class RoomCallbackTest; + mutable std::mutex lock_; ConnectionState connection_state_ = ConnectionState::Disconnected; RoomDelegate *delegate_ = nullptr; // Not owned diff --git a/include/livekit/room_delegate.h b/include/livekit/room_delegate.h index 04474a9f..2621c92c 100644 --- a/include/livekit/room_delegate.h +++ b/include/livekit/room_delegate.h @@ -287,6 +287,24 @@ class RoomDelegate { */ virtual void onTextStreamOpened(Room &, const TextStreamOpenedEvent &) {} + // ------------------------------------------------------------------ + // Data tracks + // ------------------------------------------------------------------ + + /** + * Called when a remote participant publishes a data track. + * + * Data tracks are independent of the audio/video track hierarchy and + * require an explicit subscribe() call to start receiving frames. + */ + virtual void onDataTrackPublished(Room &, const DataTrackPublishedEvent &) {} + + /** + * Called when a remote participant unpublishes a data track. + */ + virtual void onDataTrackUnpublished(Room &, + const DataTrackUnpublishedEvent &) {} + // ------------------------------------------------------------------ // Participants snapshot // ------------------------------------------------------------------ diff --git a/include/livekit/room_event_types.h b/include/livekit/room_event_types.h index 63c75140..553f79c8 100644 --- a/include/livekit/room_event_types.h +++ b/include/livekit/room_event_types.h @@ -29,6 +29,7 @@ namespace livekit { class Track; class Participant; class RemoteParticipant; +class RemoteDataTrack; class LocalTrackPublication; class RemoteTrackPublication; class TrackPublication; @@ -100,7 +101,7 @@ enum class DisconnectReason { RoomClosed, UserUnavailable, UserRejected, - SipTrunkFailure, + SipTrunkFailure, ///< SIP (telephony) trunk connection failed ConnectionTimeout, MediaFailure }; @@ -117,10 +118,17 @@ struct UserPacketData { }; /** - * SIP DTMF payload carried via data packets. + * SIP (Session Initiation Protocol) DTMF payload carried via data packets. + * + * SIP is a signalling protocol used in VoIP telephony. LiveKit supports + * SIP trunking, which bridges traditional phone calls into LiveKit rooms. + * DTMF (Dual-Tone Multi-Frequency) tones are the signals generated when + * phone keypad buttons are pressed (0-9, *, #). This struct surfaces + * those tones so that applications handling SIP-bridged calls can react + * to caller input (e.g. IVR menu selection). */ struct SipDtmfData { - /** DTMF code value. */ + /** Numeric DTMF code (0-15, mapping to 0-9, *, #, A-D). */ std::uint32_t code = 0; /** Human-readable digit representation (e.g. "1", "#"). */ @@ -719,4 +727,24 @@ struct E2eeStateChangedEvent { EncryptionState state = EncryptionState::New; }; +/** + * Fired when a participant publishes a data track. + * + * Data tracks are independent of the audio/video track hierarchy. + * The application must call RemoteDataTrack::subscribe() to start + * receiving frames. + */ +struct DataTrackPublishedEvent { + /** The newly published remote data track. */ + std::shared_ptr track; +}; + +/** + * Fired when a remote participant unpublishes a data track. + */ +struct DataTrackUnpublishedEvent { + /** SID of the track that was unpublished. */ + std::string sid; +}; + } // namespace livekit diff --git a/include/livekit/subscription_thread_dispatcher.h b/include/livekit/subscription_thread_dispatcher.h index 3e843541..f7795fc2 100644 --- a/include/livekit/subscription_thread_dispatcher.h +++ b/include/livekit/subscription_thread_dispatcher.h @@ -24,13 +24,17 @@ #include #include #include +#include #include #include #include +#include namespace livekit { class AudioFrame; +class DataTrackSubscription; +class RemoteDataTrack; class Track; class VideoFrame; @@ -43,6 +47,18 @@ using AudioFrameCallback = std::function; using VideoFrameCallback = std::function; +/// Callback type for incoming data track frames. +/// Invoked on a dedicated reader thread per subscription. +/// @param payload Raw binary data received. +/// @param user_timestamp Optional application-defined timestamp from sender. +using DataFrameCallback = + std::function &payload, + std::optional user_timestamp)>; + +/// Opaque identifier returned by addOnDataFrameCallback, used to remove an +/// individual subscription via removeOnDataFrameCallback. +using DataFrameCallbackId = std::uint64_t; + /** * Owns subscription callback registration and per-subscription reader threads. * @@ -90,6 +106,24 @@ class SubscriptionThreadDispatcher { TrackSource source, AudioFrameCallback callback, AudioStream::Options opts = {}); + /** + * Register or replace an audio frame callback for a remote subscription. + * + * The callback is keyed by remote participant identity plus \p track_name. + * If the matching remote audio track is already subscribed, \ref Room may + * immediately call \ref handleTrackSubscribed to start a reader. + * + * @param participant_identity Identity of the remote participant. + * @param track_name Track name to match. + * @param callback Function invoked for each decoded audio frame. + * @param opts Options used when creating the backing + * \ref AudioStream. + */ + void setOnAudioFrameCallback(const std::string &participant_identity, + const std::string &track_name, + AudioFrameCallback callback, + AudioStream::Options opts = {}); + /** * Register or replace a video frame callback for a remote subscription. * @@ -107,6 +141,24 @@ class SubscriptionThreadDispatcher { TrackSource source, VideoFrameCallback callback, VideoStream::Options opts = {}); + /** + * Register or replace a video frame callback for a remote subscription. + * + * The callback is keyed by remote participant identity plus \p track_name. + * If the matching remote video track is already subscribed, \ref Room may + * immediately call \ref handleTrackSubscribed to start a reader. + * + * @param participant_identity Identity of the remote participant. + * @param track_name Track name to match. + * @param callback Function invoked for each decoded video frame. + * @param opts Options used when creating the backing + * \ref VideoStream. + */ + void setOnVideoFrameCallback(const std::string &participant_identity, + const std::string &track_name, + VideoFrameCallback callback, + VideoStream::Options opts = {}); + /** * Remove an audio callback registration and stop any active reader. * @@ -119,6 +171,18 @@ class SubscriptionThreadDispatcher { void clearOnAudioFrameCallback(const std::string &participant_identity, TrackSource source); + /** + * Remove an audio callback registration and stop any active reader. + * + * If an audio reader thread is active for the given key, its stream is + * closed and the thread is joined before this call returns. + * + * @param participant_identity Identity of the remote participant. + * @param track_name Track name to clear. + */ + void clearOnAudioFrameCallback(const std::string &participant_identity, + const std::string &track_name); + /** * Remove a video callback registration and stop any active reader. * @@ -131,6 +195,18 @@ class SubscriptionThreadDispatcher { void clearOnVideoFrameCallback(const std::string &participant_identity, TrackSource source); + /** + * Remove a video callback registration and stop any active reader. + * + * If a video reader thread is active for the given key, its stream is + * closed and the thread is joined before this call returns. + * + * @param participant_identity Identity of the remote participant. + * @param track_name Track name to clear. + */ + void clearOnVideoFrameCallback(const std::string &participant_identity, + const std::string &track_name); + /** * Start or restart reader dispatch for a newly subscribed remote track. * @@ -146,7 +222,7 @@ class SubscriptionThreadDispatcher { * @param track Subscribed remote track to read from. */ void handleTrackSubscribed(const std::string &participant_identity, - TrackSource source, + TrackSource source, const std::string &track_name, const std::shared_ptr &track); /** @@ -159,9 +235,68 @@ class SubscriptionThreadDispatcher { * * @param participant_identity Identity of the remote participant. * @param source Track source associated with the subscription. + * @param track_name Track name associated with the subscription. */ void handleTrackUnsubscribed(const std::string &participant_identity, - TrackSource source); + TrackSource source, + const std::string &track_name); + + // --------------------------------------------------------------- + // Data track callbacks + // --------------------------------------------------------------- + + /** + * Add a callback for data frames from a specific remote participant's + * data track. + * + * Multiple callbacks may be registered for the same (participant, + * track_name) pair; each one creates an independent FFI subscription. + * + * The callback fires on a dedicated background thread. If the remote + * data track has not yet been published, the callback is stored and + * auto-wired when the track appears (via handleDataTrackPublished). + * + * @param participant_identity Identity of the remote participant. + * @param track_name Name of the remote data track. + * @param callback Function to invoke per data frame. + * @return An opaque ID that can later be passed to + * removeOnDataFrameCallback() to tear down this subscription. + */ + DataFrameCallbackId + addOnDataFrameCallback(const std::string &participant_identity, + const std::string &track_name, + DataFrameCallback callback); + + /** + * Remove a data frame callback previously registered via + * addOnDataFrameCallback(). Stops and joins the active reader thread + * for this subscription. + * No-op if the ID is not (or no longer) registered. + * + * @param id The identifier returned by addOnDataFrameCallback(). + */ + void removeOnDataFrameCallback(DataFrameCallbackId id); + + /** + * Notify the dispatcher that a remote data track has been published. + * + * \ref Room calls this when it receives a kDataTrackPublished event. + * For every registered callback whose (participant, track_name) matches, + * a reader thread is launched. + * + * @param track The newly published remote data track. + */ + void handleDataTrackPublished(const std::shared_ptr &track); + + /** + * Notify the dispatcher that a remote data track has been unpublished. + * + * \ref Room calls this when it receives a kDataTrackUnpublished event. + * Any active data reader threads for this track SID are closed and joined. + * + * @param sid The SID of the unpublished data track. + */ + void handleDataTrackUnpublished(const std::string &sid); /** * Stop all readers and clear all callback registrations. @@ -174,14 +309,17 @@ class SubscriptionThreadDispatcher { private: friend class SubscriptionThreadDispatcherTest; - /// Compound lookup key for a remote participant identity and track source. + /// Compound lookup key for callback dispatch: + /// either `(participant, source, "")` or `(participant, SOURCE_UNKNOWN, + /// track_name)`. struct CallbackKey { std::string participant_identity; TrackSource source; + std::string track_name; bool operator==(const CallbackKey &o) const { return participant_identity == o.participant_identity && - source == o.source; + source == o.source && track_name == o.track_name; } }; @@ -190,17 +328,52 @@ class SubscriptionThreadDispatcher { std::size_t operator()(const CallbackKey &k) const { auto h1 = std::hash{}(k.participant_identity); auto h2 = std::hash{}(static_cast(k.source)); - return h1 ^ (h2 << 1); + auto h3 = std::hash{}(k.track_name); + return h1 ^ (h2 << 1) ^ (h3 << 2); } }; - /// Active read-side resources for one subscription dispatch slot. + /// Active read-side resources for one audio/video subscription dispatch slot. struct ActiveReader { std::shared_ptr audio_stream; std::shared_ptr video_stream; std::thread thread; }; + /// Compound lookup key for a remote participant identity and data track name. + struct DataCallbackKey { + std::string participant_identity; + std::string track_name; + + bool operator==(const DataCallbackKey &o) const { + return participant_identity == o.participant_identity && + track_name == o.track_name; + } + }; + + /// Hash function for \ref DataCallbackKey. + struct DataCallbackKeyHash { + std::size_t operator()(const DataCallbackKey &k) const { + auto h1 = std::hash{}(k.participant_identity); + auto h2 = std::hash{}(k.track_name); + return h1 ^ (h2 << 1); + } + }; + + /// Stored data callback registration. + struct RegisteredDataCallback { + DataCallbackKey key; + DataFrameCallback callback; + }; + + /// Active read-side resources for one data track subscription. + struct ActiveDataReader { + std::shared_ptr remote_track; + std::mutex sub_mutex; + std::shared_ptr subscription; // guarded by sub_mutex + std::thread thread; + }; + /// Stored audio callback registration plus stream-construction options. struct RegisteredAudioCallback { AudioFrameCallback callback; @@ -243,21 +416,52 @@ class SubscriptionThreadDispatcher { VideoFrameCallback cb, const VideoStream::Options &opts); + /// Extract and close the data reader for a given callback ID, returning its + /// thread. Must be called with \ref lock_ held. + std::thread extractDataReaderThreadLocked(DataFrameCallbackId id); + + /// Extract and close the data reader for a given (participant, track_name) + /// key, returning its thread. Must be called with \ref lock_ held. + std::thread extractDataReaderThreadLocked(const DataCallbackKey &key); + + /// Start a data reader thread for the given callback ID, key, and track. + /// Must be called with \ref lock_ held. + std::thread + startDataReaderLocked(DataFrameCallbackId id, const DataCallbackKey &key, + const std::shared_ptr &track, + DataFrameCallback cb); + /// Protects callback registration maps and active reader state. mutable std::mutex lock_; - /// Registered audio frame callbacks keyed by `(participant, source)`. + /// Registered audio frame callbacks keyed by \ref CallbackKey. std::unordered_map audio_callbacks_; - /// Registered video frame callbacks keyed by `(participant, source)`. + /// Registered video frame callbacks keyed by \ref CallbackKey. std::unordered_map video_callbacks_; - /// Active stream/thread state keyed by `(participant, source)`. + /// Active stream/thread state keyed by \ref CallbackKey. std::unordered_map active_readers_; + /// Next auto-increment ID for data frame callbacks. + DataFrameCallbackId next_data_callback_id_; + + /// Registered data frame callbacks keyed by opaque callback ID. + std::unordered_map + data_callbacks_; + + /// Active data reader threads keyed by callback ID. + std::unordered_map> + active_data_readers_; + + /// Currently published remote data tracks, keyed by (participant, name). + std::unordered_map, + DataCallbackKeyHash> + remote_data_tracks_; + /// Hard limit on concurrently active per-subscription reader threads. static constexpr int kMaxActiveReaders = 20; }; diff --git a/src/data_track_subscription.cpp b/src/data_track_subscription.cpp new file mode 100644 index 00000000..b92480ab --- /dev/null +++ b/src/data_track_subscription.cpp @@ -0,0 +1,182 @@ +/* + * Copyright 2026 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "livekit/data_track_subscription.h" + +#include "data_track.pb.h" +#include "ffi.pb.h" +#include "ffi_client.h" +#include "livekit/lk_log.h" + +#include + +namespace livekit { + +using proto::FfiEvent; + +DataTrackSubscription::~DataTrackSubscription() { close(); } + +DataTrackSubscription::DataTrackSubscription( + DataTrackSubscription &&other) noexcept { + std::lock_guard lock(other.mutex_); + frame_ = std::move(other.frame_); + eof_ = other.eof_; + closed_ = other.closed_; + subscription_handle_ = std::move(other.subscription_handle_); + listener_id_ = other.listener_id_; + + other.listener_id_ = 0; + other.closed_ = true; +} + +DataTrackSubscription & +DataTrackSubscription::operator=(DataTrackSubscription &&other) noexcept { + if (this == &other) { + return *this; + } + + close(); + + { + std::lock_guard lock_this(mutex_); + std::lock_guard lock_other(other.mutex_); + + frame_ = std::move(other.frame_); + eof_ = other.eof_; + closed_ = other.closed_; + subscription_handle_ = std::move(other.subscription_handle_); + listener_id_ = other.listener_id_; + + other.listener_id_ = 0; + other.closed_ = true; + } + + return *this; +} + +void DataTrackSubscription::init(FfiHandle subscription_handle) { + subscription_handle_ = std::move(subscription_handle); + + listener_id_ = FfiClient::instance().AddListener( + [this](const FfiEvent &e) { this->onFfiEvent(e); }); +} + +bool DataTrackSubscription::read(DataFrame &out) { + { + std::lock_guard lock(mutex_); + if (closed_ || eof_) { + return false; + } + } + + // Signal the Rust side that we're ready to receive the next frame. + // The Rust SubscriptionTask uses a demand-driven protocol: it won't pull + // from the underlying stream until notified via this request. + proto::FfiRequest req; + auto *msg = req.mutable_data_track_subscription_read(); + msg->set_subscription_handle( + static_cast(subscription_handle_.get())); + FfiClient::instance().sendRequest(req); + + std::unique_lock lock(mutex_); + cv_.wait(lock, [this] { return frame_.has_value() || eof_ || closed_; }); + + if (closed_ || (!frame_.has_value() && eof_)) { + return false; + } + + out = std::move(frame_.value()); + frame_.reset(); + return true; +} + +void DataTrackSubscription::close() { + { + std::lock_guard lock(mutex_); + if (closed_) { + return; + } + closed_ = true; + } + + if (subscription_handle_.get() != 0) { + subscription_handle_.reset(); + } + + if (listener_id_ != 0) { + FfiClient::instance().RemoveListener(listener_id_); + listener_id_ = 0; + } + + cv_.notify_all(); +} + +void DataTrackSubscription::onFfiEvent(const FfiEvent &event) { + if (event.message_case() != FfiEvent::kDataTrackSubscriptionEvent) { + return; + } + + const auto &dts = event.data_track_subscription_event(); + if (dts.subscription_handle() != + static_cast(subscription_handle_.get())) { + return; + } + + if (dts.has_frame_received()) { + const auto &fr = dts.frame_received().frame(); + DataFrame frame; + const auto &payload_str = fr.payload(); + frame.payload.assign( + reinterpret_cast(payload_str.data()), + reinterpret_cast(payload_str.data()) + + payload_str.size()); + if (fr.has_user_timestamp()) { + frame.user_timestamp = fr.user_timestamp(); + } + pushFrame(std::move(frame)); + } else if (dts.has_eos()) { + pushEos(); + } +} + +void DataTrackSubscription::pushFrame(DataFrame &&frame) { + std::lock_guard lock(mutex_); + + if (closed_ || eof_) { + return; + } + + // rust side handles buffering, so we should only really ever have one item + assert(!frame_.has_value()); + + frame_ = std::move(frame); + + // notify no matter what since we got a new frame + cv_.notify_one(); +} + +void DataTrackSubscription::pushEos() { + { + std::lock_guard lock(mutex_); + if (eof_) { + return; + } + eof_ = true; + } + cv_.notify_all(); +} + +} // namespace livekit diff --git a/src/e2ee.cpp b/src/e2ee.cpp index dc95252f..ae46bf79 100644 --- a/src/e2ee.cpp +++ b/src/e2ee.cpp @@ -166,6 +166,7 @@ void E2EEManager::setEnabled(bool enabled) { req.mutable_e2ee()->set_room_handle(room_handle_); req.mutable_e2ee()->mutable_manager_set_enabled()->set_enabled(enabled); FfiClient::instance().sendRequest(req); + enabled_ = enabled; } E2EEManager::KeyProvider *E2EEManager::keyProvider() { return &key_provider_; } diff --git a/src/ffi_client.cpp b/src/ffi_client.cpp index eba73821..78ed9a5b 100644 --- a/src/ffi_client.cpp +++ b/src/ffi_client.cpp @@ -16,6 +16,7 @@ #include +#include "data_track.pb.h" #include "e2ee.pb.h" #include "ffi.pb.h" #include "ffi_client.h" @@ -114,6 +115,12 @@ std::optional ExtractAsyncId(const proto::FfiEvent &event) { case E::kSendBytes: return event.send_bytes().async_id(); + // data track async completions + case E::kPublishDataTrack: + return event.publish_data_track().async_id(); + case E::kSubscribeDataTrack: + return event.subscribe_data_track().async_id(); + // NOT async completion: case E::kRoomEvent: case E::kTrackEvent: @@ -121,6 +128,7 @@ std::optional ExtractAsyncId(const proto::FfiEvent &event) { case E::kAudioStreamEvent: case E::kByteStreamReaderEvent: case E::kTextStreamReaderEvent: + case E::kDataTrackSubscriptionEvent: case E::kRpcMethodInvocation: case E::kLogs: case E::kPanic: @@ -318,6 +326,11 @@ FfiClient::connectAsync(const std::string &url, const std::string &token, opts->set_dynacast(options.dynacast); opts->set_single_peer_connection(options.single_peer_connection); + LK_LOG_DEBUG("[FfiClient] connectAsync: auto_subscribe={}, dynacast={}, " + "single_peer_connection={}", + options.auto_subscribe, options.dynacast, + options.single_peer_connection); + // --- E2EE / encryption (optional) --- if (options.encryption.has_value()) { const E2EEOptions &e2ee = *options.encryption; @@ -608,6 +621,103 @@ std::future FfiClient::publishDataAsync( return fut; } +std::future +FfiClient::publishDataTrackAsync(std::uint64_t local_participant_handle, + const std::string &track_name) { + const AsyncId async_id = generateAsyncId(); + + auto fut = registerAsync( + async_id, + [async_id](const proto::FfiEvent &event) { + return event.has_publish_data_track() && + event.publish_data_track().async_id() == async_id; + }, + [](const proto::FfiEvent &event, + std::promise &pr) { + const auto &cb = event.publish_data_track(); + if (cb.has_error() && !cb.error().empty()) { + pr.set_exception( + std::make_exception_ptr(std::runtime_error(cb.error()))); + return; + } + if (!cb.has_track()) { + pr.set_exception(std::make_exception_ptr( + std::runtime_error("PublishDataTrackCallback missing track"))); + return; + } + proto::OwnedLocalDataTrack track = cb.track(); + pr.set_value(std::move(track)); + }); + + proto::FfiRequest req; + auto *msg = req.mutable_publish_data_track(); + msg->set_local_participant_handle(local_participant_handle); + msg->mutable_options()->set_name(track_name); + msg->set_request_async_id(async_id); + + try { + proto::FfiResponse resp = sendRequest(req); + if (!resp.has_publish_data_track()) { + logAndThrow("FfiResponse missing publish_data_track"); + } + } catch (...) { + cancelPendingByAsyncId(async_id); + throw; + } + + return fut; +} + +std::future +FfiClient::subscribeDataTrackAsync(std::uint64_t track_handle, + std::optional buffer_size) { + const AsyncId async_id = generateAsyncId(); + + auto fut = registerAsync( + async_id, + [async_id](const proto::FfiEvent &event) { + return event.has_subscribe_data_track() && + event.subscribe_data_track().async_id() == async_id; + }, + [](const proto::FfiEvent &event, + std::promise &pr) { + const auto &cb = event.subscribe_data_track(); + if (cb.has_error() && !cb.error().empty()) { + pr.set_exception( + std::make_exception_ptr(std::runtime_error(cb.error()))); + return; + } + if (!cb.has_subscription()) { + pr.set_exception(std::make_exception_ptr(std::runtime_error( + "SubscribeDataTrackCallback missing subscription"))); + return; + } + proto::OwnedDataTrackSubscription sub = cb.subscription(); + pr.set_value(std::move(sub)); + }); + + proto::FfiRequest req; + auto *msg = req.mutable_subscribe_data_track(); + msg->set_track_handle(track_handle); + auto *opts = msg->mutable_options(); + if (buffer_size.has_value()) { + opts->set_buffer_size(buffer_size.value()); + } + msg->set_request_async_id(async_id); + + try { + proto::FfiResponse resp = sendRequest(req); + if (!resp.has_subscribe_data_track()) { + logAndThrow("FfiResponse missing subscribe_data_track"); + } + } catch (...) { + cancelPendingByAsyncId(async_id); + throw; + } + + return fut; +} + std::future FfiClient::publishSipDtmfAsync( std::uint64_t local_participant_handle, std::uint32_t code, const std::string &digit, diff --git a/src/ffi_client.h b/src/ffi_client.h index 667100ea..ff23d090 100644 --- a/src/ffi_client.h +++ b/src/ffi_client.h @@ -18,11 +18,13 @@ #define LIVEKIT_FFI_CLIENT_H #include +#include #include #include #include #include #include +#include #include #include @@ -38,6 +40,8 @@ class FfiEvent; class FfiResponse; class FfiRequest; class OwnedTrackPublication; +class OwnedLocalDataTrack; +class OwnedDataTrackSubscription; class DataStream; } // namespace proto @@ -123,6 +127,14 @@ class FfiClient { const std::string &payload, std::optional response_timeout_ms = std::nullopt); + // Data Track APIs + std::future + publishDataTrackAsync(std::uint64_t local_participant_handle, + const std::string &track_name); + std::future subscribeDataTrackAsync( + std::uint64_t track_handle, + std::optional buffer_size = std::nullopt); + // Data stream functionalities std::future sendStreamHeaderAsync(std::uint64_t local_participant_handle, diff --git a/src/local_data_track.cpp b/src/local_data_track.cpp new file mode 100644 index 00000000..380deb57 --- /dev/null +++ b/src/local_data_track.cpp @@ -0,0 +1,107 @@ +/* + * Copyright 2026 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "livekit/local_data_track.h" + +#include "livekit/lk_log.h" + +#include "data_track.pb.h" +#include "ffi.pb.h" +#include "ffi_client.h" + +namespace livekit { + +LocalDataTrack::LocalDataTrack(const proto::OwnedLocalDataTrack &owned) + : handle_(static_cast(owned.handle().id())) { + const auto &pi = owned.info(); + info_.name = pi.name(); + info_.sid = pi.sid(); + info_.uses_e2ee = pi.uses_e2ee(); +} + +bool LocalDataTrack::tryPush(const DataFrame &frame) { + if (!handle_.valid()) { + return false; + } + + proto::FfiRequest req; + auto *msg = req.mutable_local_data_track_try_push(); + msg->set_track_handle(static_cast(handle_.get())); + auto *pf = msg->mutable_frame(); + pf->set_payload(frame.payload.data(), frame.payload.size()); + if (frame.user_timestamp.has_value()) { + pf->set_user_timestamp(frame.user_timestamp.value()); + } + + proto::FfiResponse resp = FfiClient::instance().sendRequest(req); + const auto &r = resp.local_data_track_try_push(); + return !r.has_error(); +} + +bool LocalDataTrack::tryPush(const std::vector &payload, + std::optional user_timestamp) { + DataFrame frame; + frame.payload = payload; + frame.user_timestamp = user_timestamp; + + try { + return tryPush(frame); + } catch (const std::exception &e) { + LK_LOG_ERROR("[LocalDataTrack] tryPush error: {}", e.what()); + return false; + } +} + +bool LocalDataTrack::tryPush(const std::uint8_t *data, std::size_t size, + std::optional user_timestamp) { + DataFrame frame; + frame.payload.assign(data, data + size); + frame.user_timestamp = user_timestamp; + + try { + return tryPush(frame); + } catch (const std::exception &e) { + LK_LOG_ERROR("[LocalDataTrack] tryPush error: {}", e.what()); + return false; + } +} + +bool LocalDataTrack::isPublished() const { + if (!handle_.valid()) { + return false; + } + + proto::FfiRequest req; + auto *msg = req.mutable_local_data_track_is_published(); + msg->set_track_handle(static_cast(handle_.get())); + + proto::FfiResponse resp = FfiClient::instance().sendRequest(req); + return resp.local_data_track_is_published().is_published(); +} + +void LocalDataTrack::unpublishDataTrack() { + if (!handle_.valid()) { + return; + } + + proto::FfiRequest req; + auto *msg = req.mutable_local_data_track_unpublish(); + msg->set_track_handle(static_cast(handle_.get())); + + (void)FfiClient::instance().sendRequest(req); +} + +} // namespace livekit diff --git a/src/local_participant.cpp b/src/local_participant.cpp index 8aea35ff..9b5061e8 100644 --- a/src/local_participant.cpp +++ b/src/local_participant.cpp @@ -18,11 +18,13 @@ #include "livekit/ffi_handle.h" #include "livekit/local_audio_track.h" +#include "livekit/local_data_track.h" #include "livekit/local_track_publication.h" #include "livekit/local_video_track.h" #include "livekit/room_delegate.h" #include "livekit/track.h" +#include "data_track.pb.h" #include "ffi.pb.h" #include "ffi_client.h" #include "participant.pb.h" @@ -286,6 +288,30 @@ LocalParticipant::PublicationMap LocalParticipant::trackPublications() const { return out; } +std::shared_ptr +LocalParticipant::publishDataTrack(const std::string &name) { + auto handle_id = ffiHandleId(); + if (handle_id == 0) { + throw std::runtime_error( + "LocalParticipant::publishDataTrack: invalid FFI handle"); + } + + auto fut = FfiClient::instance().publishDataTrackAsync( + static_cast(handle_id), name); + + proto::OwnedLocalDataTrack owned = fut.get(); + return std::shared_ptr(new LocalDataTrack(owned)); +} + +void LocalParticipant::unpublishDataTrack( + const std::shared_ptr &track) { + if (!track) { + return; + } + + track->unpublishDataTrack(); +} + std::string LocalParticipant::performRpc( const std::string &destination_identity, const std::string &method, const std::string &payload, const std::optional &response_timeout) { diff --git a/src/remote_data_track.cpp b/src/remote_data_track.cpp new file mode 100644 index 00000000..1b58beed --- /dev/null +++ b/src/remote_data_track.cpp @@ -0,0 +1,68 @@ +/* + * Copyright 2026 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "livekit/remote_data_track.h" + +#include "data_track.pb.h" +#include "ffi.pb.h" +#include "ffi_client.h" + +#include + +namespace livekit { + +RemoteDataTrack::RemoteDataTrack(const proto::OwnedRemoteDataTrack &owned) + : handle_(static_cast(owned.handle().id())), + publisher_identity_(owned.publisher_identity()) { + const auto &pi = owned.info(); + info_.name = pi.name(); + info_.sid = pi.sid(); + info_.uses_e2ee = pi.uses_e2ee(); +} + +bool RemoteDataTrack::isPublished() const { + if (!handle_.valid()) { + return false; + } + + proto::FfiRequest req; + auto *msg = req.mutable_remote_data_track_is_published(); + msg->set_track_handle(static_cast(handle_.get())); + + proto::FfiResponse resp = FfiClient::instance().sendRequest(req); + return resp.remote_data_track_is_published().is_published(); +} + +std::shared_ptr +RemoteDataTrack::subscribe(const DataTrackSubscription::Options &options) { + if (!handle_.valid()) { + throw std::runtime_error("RemoteDataTrack::subscribe: invalid FFI handle"); + } + + auto fut = FfiClient::instance().subscribeDataTrackAsync( + static_cast(handle_.get()), options.buffer_size); + + proto::OwnedDataTrackSubscription owned_sub = fut.get(); + + FfiHandle sub_handle(static_cast(owned_sub.handle().id())); + + auto subscription = + std::shared_ptr(new DataTrackSubscription()); + subscription->init(std::move(sub_handle)); + return subscription; +} + +} // namespace livekit diff --git a/src/room.cpp b/src/room.cpp index ab7ab286..cd3d6e51 100644 --- a/src/room.cpp +++ b/src/room.cpp @@ -18,15 +18,18 @@ #include "livekit/audio_stream.h" #include "livekit/e2ee.h" +#include "livekit/local_data_track.h" #include "livekit/local_participant.h" #include "livekit/local_track_publication.h" #include "livekit/remote_audio_track.h" +#include "livekit/remote_data_track.h" #include "livekit/remote_participant.h" #include "livekit/remote_track_publication.h" #include "livekit/remote_video_track.h" #include "livekit/room_delegate.h" #include "livekit/room_event_types.h" +#include "data_track.pb.h" #include "ffi.pb.h" #include "ffi_client.h" #include "livekit/lk_log.h" @@ -168,8 +171,8 @@ bool Room::Connect(const std::string &url, const std::string &token, std::unique_ptr new_e2ee_manager; if (options.encryption) { LK_LOG_INFO("creating E2eeManager"); - e2ee_manager_ = std::unique_ptr( - new E2EEManager(room_handle_->get(), options.encryption.value())); + new_e2ee_manager = std::unique_ptr( + new E2EEManager(new_room_handle->get(), options.encryption.value())); } // Publish all state atomically under lock @@ -227,6 +230,11 @@ Room::remoteParticipants() const { return out; } +E2EEManager *Room::e2eeManager() const { + std::lock_guard g(lock_); + return e2ee_manager_.get(); +} + void Room::registerTextStreamHandler(const std::string &topic, TextStreamHandler handler) { std::lock_guard g(lock_); @@ -273,6 +281,17 @@ void Room::setOnAudioFrameCallback(const std::string &participant_identity, } } +void Room::setOnAudioFrameCallback(const std::string &participant_identity, + const std::string &track_name, + AudioFrameCallback callback, + AudioStream::Options opts) { + if (subscription_thread_dispatcher_) { + subscription_thread_dispatcher_->setOnAudioFrameCallback( + participant_identity, track_name, std::move(callback), + std::move(opts)); + } +} + void Room::setOnVideoFrameCallback(const std::string &participant_identity, TrackSource source, VideoFrameCallback callback, @@ -283,6 +302,17 @@ void Room::setOnVideoFrameCallback(const std::string &participant_identity, } } +void Room::setOnVideoFrameCallback(const std::string &participant_identity, + const std::string &track_name, + VideoFrameCallback callback, + VideoStream::Options opts) { + if (subscription_thread_dispatcher_) { + subscription_thread_dispatcher_->setOnVideoFrameCallback( + participant_identity, track_name, std::move(callback), + std::move(opts)); + } +} + void Room::clearOnAudioFrameCallback(const std::string &participant_identity, TrackSource source) { if (subscription_thread_dispatcher_) { @@ -291,6 +321,14 @@ void Room::clearOnAudioFrameCallback(const std::string &participant_identity, } } +void Room::clearOnAudioFrameCallback(const std::string &participant_identity, + const std::string &track_name) { + if (subscription_thread_dispatcher_) { + subscription_thread_dispatcher_->clearOnAudioFrameCallback( + participant_identity, track_name); + } +} + void Room::clearOnVideoFrameCallback(const std::string &participant_identity, TrackSource source) { if (subscription_thread_dispatcher_) { @@ -299,6 +337,31 @@ void Room::clearOnVideoFrameCallback(const std::string &participant_identity, } } +void Room::clearOnVideoFrameCallback(const std::string &participant_identity, + const std::string &track_name) { + if (subscription_thread_dispatcher_) { + subscription_thread_dispatcher_->clearOnVideoFrameCallback( + participant_identity, track_name); + } +} + +DataFrameCallbackId +Room::addOnDataFrameCallback(const std::string &participant_identity, + const std::string &track_name, + DataFrameCallback callback) { + if (subscription_thread_dispatcher_) { + return subscription_thread_dispatcher_->addOnDataFrameCallback( + participant_identity, track_name, std::move(callback)); + } + return std::numeric_limits::max(); +} + +void Room::removeOnDataFrameCallback(DataFrameCallbackId id) { + if (subscription_thread_dispatcher_) { + subscription_thread_dispatcher_->removeOnDataFrameCallback(id); + } +} + void Room::OnEvent(const FfiEvent &event) { // Take a snapshot of the delegate under lock, but do NOT call it under the // lock. @@ -586,7 +649,7 @@ void Room::OnEvent(const FfiEvent &event) { if (subscription_thread_dispatcher_ && remote_track && rpublication) { subscription_thread_dispatcher_->handleTrackSubscribed( - identity, rpublication->source(), remote_track); + identity, rpublication->source(), rpublication->name(), remote_track); } break; } @@ -631,7 +694,11 @@ void Room::OnEvent(const FfiEvent &event) { if (subscription_thread_dispatcher_ && unsub_source != TrackSource::SOURCE_UNKNOWN) { subscription_thread_dispatcher_->handleTrackUnsubscribed(unsub_identity, - unsub_source); + unsub_source, + ev.publication + ? ev.publication + ->name() + : ""); } break; } @@ -656,6 +723,36 @@ void Room::OnEvent(const FfiEvent &event) { } break; } + case proto::RoomEvent::kDataTrackPublished: { + const auto &rdtp = re.data_track_published(); + auto remote_track = + std::shared_ptr(new RemoteDataTrack(rdtp.track())); + + if (subscription_thread_dispatcher_) { + subscription_thread_dispatcher_->handleDataTrackPublished(remote_track); + } + + DataTrackPublishedEvent ev; + ev.track = remote_track; + if (delegate_snapshot) { + delegate_snapshot->onDataTrackPublished(*this, ev); + } + break; + } + case proto::RoomEvent::kDataTrackUnpublished: { + const auto &dtu = re.data_track_unpublished(); + + if (subscription_thread_dispatcher_) { + subscription_thread_dispatcher_->handleDataTrackUnpublished(dtu.sid()); + } + + DataTrackUnpublishedEvent ev; + ev.sid = dtu.sid(); + if (delegate_snapshot) { + delegate_snapshot->onDataTrackUnpublished(*this, ev); + } + break; + } case proto::RoomEvent::kTrackMuted: { TrackMutedEvent ev; bool success = false; diff --git a/src/subscription_thread_dispatcher.cpp b/src/subscription_thread_dispatcher.cpp index a7f9a2a7..6ea50e3c 100644 --- a/src/subscription_thread_dispatcher.cpp +++ b/src/subscription_thread_dispatcher.cpp @@ -16,7 +16,10 @@ #include "livekit/subscription_thread_dispatcher.h" +#include "livekit/data_frame.h" +#include "livekit/data_track_subscription.h" #include "livekit/lk_log.h" +#include "livekit/remote_data_track.h" #include "livekit/track.h" #include @@ -40,7 +43,8 @@ const char *trackKindName(TrackKind kind) { } // namespace -SubscriptionThreadDispatcher::SubscriptionThreadDispatcher() = default; +SubscriptionThreadDispatcher::SubscriptionThreadDispatcher() + : next_data_callback_id_(1) {} SubscriptionThreadDispatcher::~SubscriptionThreadDispatcher() { LK_LOG_DEBUG("Destroying SubscriptionThreadDispatcher"); @@ -50,7 +54,7 @@ SubscriptionThreadDispatcher::~SubscriptionThreadDispatcher() { void SubscriptionThreadDispatcher::setOnAudioFrameCallback( const std::string &participant_identity, TrackSource source, AudioFrameCallback callback, AudioStream::Options opts) { - CallbackKey key{participant_identity, source}; + CallbackKey key{participant_identity, source, ""}; std::lock_guard lock(lock_); const bool replacing = audio_callbacks_.find(key) != audio_callbacks_.end(); audio_callbacks_[key] = @@ -61,10 +65,24 @@ void SubscriptionThreadDispatcher::setOnAudioFrameCallback( audio_callbacks_.size()); } +void SubscriptionThreadDispatcher::setOnAudioFrameCallback( + const std::string &participant_identity, const std::string &track_name, + AudioFrameCallback callback, AudioStream::Options opts) { + CallbackKey key{participant_identity, TrackSource::SOURCE_UNKNOWN, track_name}; + std::lock_guard lock(lock_); + const bool replacing = audio_callbacks_.find(key) != audio_callbacks_.end(); + audio_callbacks_[key] = + RegisteredAudioCallback{std::move(callback), std::move(opts)}; + LK_LOG_DEBUG("Registered audio frame callback for participant={} track_name={} " + "replacing_existing={} total_audio_callbacks={}", + participant_identity, track_name, replacing, + audio_callbacks_.size()); +} + void SubscriptionThreadDispatcher::setOnVideoFrameCallback( const std::string &participant_identity, TrackSource source, VideoFrameCallback callback, VideoStream::Options opts) { - CallbackKey key{participant_identity, source}; + CallbackKey key{participant_identity, source, ""}; std::lock_guard lock(lock_); const bool replacing = video_callbacks_.find(key) != video_callbacks_.end(); video_callbacks_[key] = @@ -75,9 +93,23 @@ void SubscriptionThreadDispatcher::setOnVideoFrameCallback( video_callbacks_.size()); } +void SubscriptionThreadDispatcher::setOnVideoFrameCallback( + const std::string &participant_identity, const std::string &track_name, + VideoFrameCallback callback, VideoStream::Options opts) { + CallbackKey key{participant_identity, TrackSource::SOURCE_UNKNOWN, track_name}; + std::lock_guard lock(lock_); + const bool replacing = video_callbacks_.find(key) != video_callbacks_.end(); + video_callbacks_[key] = + RegisteredVideoCallback{std::move(callback), std::move(opts)}; + LK_LOG_DEBUG("Registered video frame callback for participant={} track_name={} " + "replacing_existing={} total_video_callbacks={}", + participant_identity, track_name, replacing, + video_callbacks_.size()); +} + void SubscriptionThreadDispatcher::clearOnAudioFrameCallback( const std::string &participant_identity, TrackSource source) { - CallbackKey key{participant_identity, source}; + CallbackKey key{participant_identity, source, ""}; std::thread old_thread; bool removed_callback = false; { @@ -95,9 +127,29 @@ void SubscriptionThreadDispatcher::clearOnAudioFrameCallback( } } +void SubscriptionThreadDispatcher::clearOnAudioFrameCallback( + const std::string &participant_identity, const std::string &track_name) { + CallbackKey key{participant_identity, TrackSource::SOURCE_UNKNOWN, track_name}; + std::thread old_thread; + bool removed_callback = false; + { + std::lock_guard lock(lock_); + removed_callback = audio_callbacks_.erase(key) > 0; + old_thread = extractReaderThreadLocked(key); + LK_LOG_DEBUG( + "Clearing audio frame callback for participant={} track_name={} " + "removed_callback={} stopped_reader={} remaining_audio_callbacks={}", + participant_identity, track_name, removed_callback, + old_thread.joinable(), audio_callbacks_.size()); + } + if (old_thread.joinable()) { + old_thread.join(); + } +} + void SubscriptionThreadDispatcher::clearOnVideoFrameCallback( const std::string &participant_identity, TrackSource source) { - CallbackKey key{participant_identity, source}; + CallbackKey key{participant_identity, source, ""}; std::thread old_thread; bool removed_callback = false; { @@ -115,8 +167,29 @@ void SubscriptionThreadDispatcher::clearOnVideoFrameCallback( } } +void SubscriptionThreadDispatcher::clearOnVideoFrameCallback( + const std::string &participant_identity, const std::string &track_name) { + CallbackKey key{participant_identity, TrackSource::SOURCE_UNKNOWN, track_name}; + std::thread old_thread; + bool removed_callback = false; + { + std::lock_guard lock(lock_); + removed_callback = video_callbacks_.erase(key) > 0; + old_thread = extractReaderThreadLocked(key); + LK_LOG_DEBUG( + "Clearing video frame callback for participant={} track_name={} " + "removed_callback={} stopped_reader={} remaining_video_callbacks={}", + participant_identity, track_name, removed_callback, + old_thread.joinable(), video_callbacks_.size()); + } + if (old_thread.joinable()) { + old_thread.join(); + } +} + void SubscriptionThreadDispatcher::handleTrackSubscribed( const std::string &participant_identity, TrackSource source, + const std::string &track_name, const std::shared_ptr &track) { if (!track) { LK_LOG_WARN( @@ -130,10 +203,18 @@ void SubscriptionThreadDispatcher::handleTrackSubscribed( participant_identity, static_cast(source), trackKindName(track->kind())); - CallbackKey key{participant_identity, source}; + CallbackKey key{participant_identity, TrackSource::SOURCE_UNKNOWN, track_name}; + CallbackKey fallback_key{participant_identity, source, ""}; std::thread old_thread; { std::lock_guard lock(lock_); + if (track->kind() == TrackKind::KIND_AUDIO && + audio_callbacks_.find(key) == audio_callbacks_.end()) { + key = fallback_key; + } else if (track->kind() == TrackKind::KIND_VIDEO && + video_callbacks_.find(key) == video_callbacks_.end()) { + key = fallback_key; + } old_thread = startReaderLocked(key, track); } if (old_thread.joinable()) { @@ -142,39 +223,151 @@ void SubscriptionThreadDispatcher::handleTrackSubscribed( } void SubscriptionThreadDispatcher::handleTrackUnsubscribed( - const std::string &participant_identity, TrackSource source) { - CallbackKey key{participant_identity, source}; + const std::string &participant_identity, TrackSource source, + const std::string &track_name) { + CallbackKey key{participant_identity, TrackSource::SOURCE_UNKNOWN, track_name}; + CallbackKey fallback_key{participant_identity, source, ""}; std::thread old_thread; + std::thread fallback_old_thread; { std::lock_guard lock(lock_); old_thread = extractReaderThreadLocked(key); + fallback_old_thread = extractReaderThreadLocked(fallback_key); LK_LOG_DEBUG("Handling unsubscribed track for participant={} source={} " - "stopped_reader={}", + "track_name={} stopped_reader={} fallback_stopped_reader={}", participant_identity, static_cast(source), - old_thread.joinable()); + track_name, old_thread.joinable(), + fallback_old_thread.joinable()); } if (old_thread.joinable()) { old_thread.join(); } + if (fallback_old_thread.joinable()) { + fallback_old_thread.join(); + } +} + +// ------------------------------------------------------------------- +// Data track callback registration +// ------------------------------------------------------------------- + +DataFrameCallbackId SubscriptionThreadDispatcher::addOnDataFrameCallback( + const std::string &participant_identity, const std::string &track_name, + DataFrameCallback callback) { + std::thread old_thread; + DataFrameCallbackId id; + { + std::lock_guard lock(lock_); + id = next_data_callback_id_++; + DataCallbackKey key{participant_identity, track_name}; + data_callbacks_[id] = RegisteredDataCallback{key, std::move(callback)}; + + auto track_it = remote_data_tracks_.find(key); + if (track_it != remote_data_tracks_.end()) { + old_thread = startDataReaderLocked(id, key, track_it->second, + data_callbacks_[id].callback); + } + } + if (old_thread.joinable()) { + old_thread.join(); + } + return id; +} + +void SubscriptionThreadDispatcher::removeOnDataFrameCallback( + DataFrameCallbackId id) { + std::thread old_thread; + { + std::lock_guard lock(lock_); + data_callbacks_.erase(id); + old_thread = extractDataReaderThreadLocked(id); + } + if (old_thread.joinable()) { + old_thread.join(); + } +} + +void SubscriptionThreadDispatcher::handleDataTrackPublished( + const std::shared_ptr &track) { + if (!track) { + LK_LOG_WARN("handleDataTrackPublished called with null track"); + return; + } + + LK_LOG_INFO("Handling data track published: \"{}\" from \"{}\" (sid={})", + track->info().name, track->publisherIdentity(), + track->info().sid); + + std::vector old_threads; + { + std::lock_guard lock(lock_); + DataCallbackKey key{track->publisherIdentity(), track->info().name}; + remote_data_tracks_[key] = track; + + for (auto &[id, reg] : data_callbacks_) { + if (reg.key == key) { + auto t = startDataReaderLocked(id, key, track, reg.callback); + if (t.joinable()) { + old_threads.push_back(std::move(t)); + } + } + } + } + for (auto &t : old_threads) { + t.join(); + } +} + +void SubscriptionThreadDispatcher::handleDataTrackUnpublished( + const std::string &sid) { + LK_LOG_INFO("Handling data track unpublished: sid={}", sid); + + std::vector old_threads; + { + std::lock_guard lock(lock_); + for (auto it = active_data_readers_.begin(); + it != active_data_readers_.end();) { + auto &reader = it->second; + if (reader->remote_track && reader->remote_track->info().sid == sid) { + { + std::lock_guard sub_guard(reader->sub_mutex); + if (reader->subscription) { + reader->subscription->close(); + } + } + if (reader->thread.joinable()) { + old_threads.push_back(std::move(reader->thread)); + } + it = active_data_readers_.erase(it); + } else { + ++it; + } + } + for (auto it = remote_data_tracks_.begin(); it != remote_data_tracks_.end(); + ++it) { + if (it->second && it->second->info().sid == sid) { + remote_data_tracks_.erase(it); + break; + } + } + } + for (auto &t : old_threads) { + t.join(); + } } void SubscriptionThreadDispatcher::stopAll() { std::vector threads; - std::size_t active_reader_count = 0; - std::size_t audio_callback_count = 0; - std::size_t video_callback_count = 0; { std::lock_guard lock(lock_); - active_reader_count = active_readers_.size(); - audio_callback_count = audio_callbacks_.size(); - video_callback_count = video_callbacks_.size(); LK_LOG_DEBUG("Stopping all subscription readers active_readers={} " - "audio_callbacks={} video_callbacks={}", - active_reader_count, audio_callback_count, - video_callback_count); + "active_data_readers={} audio_callbacks={} " + "video_callbacks={} data_callbacks={}", + active_readers_.size(), active_data_readers_.size(), + audio_callbacks_.size(), video_callbacks_.size(), + data_callbacks_.size()); + for (auto &[key, reader] : active_readers_) { - LK_LOG_TRACE("Closing active reader for participant={} source={}", - key.participant_identity, static_cast(key.source)); if (reader.audio_stream) { reader.audio_stream->close(); } @@ -188,6 +381,21 @@ void SubscriptionThreadDispatcher::stopAll() { active_readers_.clear(); audio_callbacks_.clear(); video_callbacks_.clear(); + + for (auto &[id, reader] : active_data_readers_) { + { + std::lock_guard sub_guard(reader->sub_mutex); + if (reader->subscription) { + reader->subscription->close(); + } + } + if (reader->thread.joinable()) { + threads.push_back(std::move(reader->thread)); + } + } + active_data_readers_.clear(); + data_callbacks_.clear(); + remote_data_tracks_.clear(); } for (auto &thread : threads) { thread.join(); @@ -199,13 +407,17 @@ std::thread SubscriptionThreadDispatcher::extractReaderThreadLocked( const CallbackKey &key) { auto it = active_readers_.find(key); if (it == active_readers_.end()) { - LK_LOG_TRACE("No active reader to extract for participant={} source={}", - key.participant_identity, static_cast(key.source)); + LK_LOG_TRACE("No active reader to extract for participant={} source={} " + "track_name={}", + key.participant_identity, static_cast(key.source), + key.track_name); return {}; } - LK_LOG_DEBUG("Extracting active reader for participant={} source={}", - key.participant_identity, static_cast(key.source)); + LK_LOG_DEBUG("Extracting active reader for participant={} source={} " + "track_name={}", + key.participant_identity, static_cast(key.source), + key.track_name); ActiveReader reader = std::move(it->second); active_readers_.erase(it); @@ -359,4 +571,102 @@ std::thread SubscriptionThreadDispatcher::startVideoReaderLocked( return old_thread; } +// ------------------------------------------------------------------- +// Data track reader helpers +// ------------------------------------------------------------------- + +std::thread SubscriptionThreadDispatcher::extractDataReaderThreadLocked( + DataFrameCallbackId id) { + auto it = active_data_readers_.find(id); + if (it == active_data_readers_.end()) { + return {}; + } + auto reader = std::move(it->second); + active_data_readers_.erase(it); + { + std::lock_guard guard(reader->sub_mutex); + if (reader->subscription) { + reader->subscription->close(); + } + } + return std::move(reader->thread); +} + +std::thread SubscriptionThreadDispatcher::extractDataReaderThreadLocked( + const DataCallbackKey &key) { + for (auto it = active_data_readers_.begin(); it != active_data_readers_.end(); + ++it) { + if (it->second && it->second->remote_track && + it->second->remote_track->publisherIdentity() == + key.participant_identity && + it->second->remote_track->info().name == key.track_name) { + auto reader = std::move(it->second); + active_data_readers_.erase(it); + { + std::lock_guard guard(reader->sub_mutex); + if (reader->subscription) { + reader->subscription->close(); + } + } + return std::move(reader->thread); + } + } + return {}; +} + +std::thread SubscriptionThreadDispatcher::startDataReaderLocked( + DataFrameCallbackId id, const DataCallbackKey &key, + const std::shared_ptr &track, DataFrameCallback cb) { + auto old_thread = extractDataReaderThreadLocked(id); + + int total_active = static_cast(active_readers_.size()) + + static_cast(active_data_readers_.size()); + if (total_active >= kMaxActiveReaders) { + LK_LOG_ERROR("Cannot start data reader for {} track={}: active reader " + "limit ({}) reached", + key.participant_identity, key.track_name, kMaxActiveReaders); + return old_thread; + } + + LK_LOG_INFO("Starting data reader for \"{}\" track=\"{}\"", + key.participant_identity, key.track_name); + + auto reader = std::make_shared(); + reader->remote_track = track; + auto identity = key.participant_identity; + auto track_name = key.track_name; + reader->thread = std::thread([reader, track, cb, identity, track_name]() { + LK_LOG_INFO("Data reader thread: subscribing to \"{}\" track=\"{}\"", + identity, track_name); + std::shared_ptr subscription; + try { + subscription = track->subscribe(); + } catch (const std::exception &e) { + LK_LOG_ERROR("Failed to subscribe to data track \"{}\" from \"{}\": {}", + track_name, identity, e.what()); + return; + } + LK_LOG_INFO("Data reader thread: subscribed to \"{}\" track=\"{}\"", + identity, track_name); + + { + std::lock_guard guard(reader->sub_mutex); + reader->subscription = subscription; + } + + DataFrame frame; + while (subscription->read(frame)) { + try { + cb(frame.payload, frame.user_timestamp); + } catch (const std::exception &e) { + LK_LOG_ERROR("Data frame callback exception: {}", e.what()); + } + } + LK_LOG_INFO("Data reader thread exiting for \"{}\" track=\"{}\"", identity, + track_name); + }); + active_data_readers_[id] = reader; + return old_thread; +} + } // namespace livekit diff --git a/src/tests/common/test_common.h b/src/tests/common/test_common.h index 0298e1f6..b217d6a8 100644 --- a/src/tests/common/test_common.h +++ b/src/tests/common/test_common.h @@ -17,6 +17,7 @@ #pragma once #include +#include #include #include #include @@ -25,8 +26,10 @@ #include #include #include +#include #include #include +#include #include #include #include @@ -46,6 +49,9 @@ constexpr int kDefaultTestIterations = 10; // Default stress test duration in seconds constexpr int kDefaultStressDurationSeconds = 600; // 10 minutes +// Local SFU URL used by end-to-end data track tests. +constexpr char kLocalTestLiveKitUrl[] = "ws://localhost:7880"; + // ============================================================================= // Common Test Configuration // ============================================================================= @@ -97,6 +103,11 @@ struct TestConfig { } }; +struct TestRoomConnectionOptions { + RoomOptions room_options; + RoomDelegate *delegate = nullptr; +}; + // ============================================================================= // Utility Functions // ============================================================================= @@ -121,6 +132,113 @@ inline bool waitForParticipant(Room *room, const std::string &identity, return false; } +inline std::array getDataTrackTestTokens() { + const char *token_a = std::getenv("LK_TOKEN_TEST_A"); + if (token_a == nullptr || std::string(token_a).empty()) { + throw std::runtime_error( + "LK_TOKEN_TEST_A must be present and non-empty for data track E2E " + "tests"); + } + + const char *token_b = std::getenv("LK_TOKEN_TEST_B"); + if (token_b == nullptr || std::string(token_b).empty()) { + throw std::runtime_error( + "LK_TOKEN_TEST_B must be present and non-empty for data track E2E " + "tests"); + } + + return {token_a, token_b}; +} + +inline void waitForParticipantVisibility( + const std::vector> &rooms, + std::chrono::milliseconds timeout = 5s) { + std::vector participant_identities; + participant_identities.reserve(rooms.size()); + for (const auto &room : rooms) { + if (!room || room->localParticipant() == nullptr) { + throw std::runtime_error( + "Test room is missing a local participant after connect"); + } + participant_identities.push_back(room->localParticipant()->identity()); + } + + auto start = std::chrono::steady_clock::now(); + while (std::chrono::steady_clock::now() - start < timeout) { + bool all_visible = true; + for (size_t i = 0; i < rooms.size(); ++i) { + const auto &room = rooms[i]; + if (!room || room->localParticipant() == nullptr) { + throw std::runtime_error( + "Test room is missing a local participant after connect"); + } + + for (size_t j = 0; j < participant_identities.size(); ++j) { + if (i == j) { + continue; + } + + if (room->remoteParticipant(participant_identities[j]) == nullptr) { + all_visible = false; + break; + } + } + + if (!all_visible) { + break; + } + } + + if (all_visible) { + return; + } + + std::this_thread::sleep_for(10ms); + } + + throw std::runtime_error("Not all test participants became visible"); +} + +inline std::vector> +testRooms(const std::vector &room_configs) { + if (room_configs.empty()) { + throw std::invalid_argument("testRooms requires at least one room"); + } + + if (room_configs.size() > 2) { + throw std::invalid_argument( + "testRooms supports at most two rooms with LK_TOKEN_TEST_A/B"); + } + + auto tokens = getDataTrackTestTokens(); + + std::vector> rooms; + rooms.reserve(room_configs.size()); + + for (size_t i = 0; i < room_configs.size(); ++i) { + auto room = std::make_unique(); + if (room_configs[i].delegate != nullptr) { + room->setDelegate(room_configs[i].delegate); + } + + if (!room->Connect(kLocalTestLiveKitUrl, tokens[i], + room_configs[i].room_options)) { + throw std::runtime_error("Failed to connect test room " + + std::to_string(i)); + } + + rooms.push_back(std::move(room)); + } + + waitForParticipantVisibility(rooms); + return rooms; +} + +inline std::vector> testRooms(size_t count) { + std::vector room_configs(count); + return testRooms(room_configs); +} + // ============================================================================= // Statistics Collection // ============================================================================= diff --git a/src/tests/integration/test_data_track.cpp b/src/tests/integration/test_data_track.cpp new file mode 100644 index 00000000..59b36f2a --- /dev/null +++ b/src/tests/integration/test_data_track.cpp @@ -0,0 +1,672 @@ +/* + * Copyright 2026 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// This test is used to verify that data tracks are published and received +// correctly. It is the same implementation as the rust +// client-sdk-rust/livekit/tests/data_track_test.rs test. To run this test, run +// a local SFU, set credentials examples/tokens/set_data_track_test_tokens.bash, +// and run: +// ./build-debug/bin/livekit_integration_tests + +#include "../common/test_common.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace livekit { +namespace test { + +using namespace std::chrono_literals; + +namespace { + +constexpr char kTrackNamePrefix[] = "data_track_e2e"; +constexpr auto kPublishDuration = 5s; +constexpr auto kTrackWaitTimeout = 10s; +constexpr auto kReadTimeout = 30s; +constexpr auto kPollingInterval = 10ms; +constexpr float kMinimumReceivedPercent = 0.95f; +constexpr int kResubscribeIterations = 10; +constexpr int kPublishManyTrackCount = 256; +constexpr auto kPublishManyTimeout = 5s; +constexpr std::size_t kLargeFramePayloadBytes = 196608; +constexpr char kE2EESharedSecret[] = "password"; +constexpr int kE2EEFrameCount = 5; + +std::string makeTrackName(const std::string &suffix) { + return std::string(kTrackNamePrefix) + "_" + suffix + "_" + + std::to_string(getTimestampUs()); +} + +std::vector e2eeSharedKey() { + return std::vector( + kE2EESharedSecret, kE2EESharedSecret + sizeof(kE2EESharedSecret) - 1); +} + +E2EEOptions makeE2EEOptions() { + E2EEOptions options; + options.key_provider_options.shared_key = e2eeSharedKey(); + return options; +} + +std::vector +encryptedRoomConfigs(RoomDelegate *subscriber_delegate) { + std::vector room_configs(2); + room_configs[0].room_options.encryption = makeE2EEOptions(); + room_configs[1].room_options.encryption = makeE2EEOptions(); + room_configs[1].delegate = subscriber_delegate; + return room_configs; +} + +template +bool waitForCondition(Predicate &&predicate, std::chrono::milliseconds timeout, + std::chrono::milliseconds interval = kPollingInterval) { + auto start = std::chrono::steady_clock::now(); + while (std::chrono::steady_clock::now() - start < timeout) { + if (predicate()) { + return true; + } + std::this_thread::sleep_for(interval); + } + return false; +} + +class DataTrackPublishedDelegate : public RoomDelegate { +public: + void onDataTrackPublished(Room &, + const DataTrackPublishedEvent &event) override { + if (!event.track) { + return; + } + + std::lock_guard lock(mutex_); + tracks_.push_back(event.track); + cv_.notify_all(); + } + + std::shared_ptr + waitForTrack(std::chrono::milliseconds timeout) { + std::unique_lock lock(mutex_); + if (!cv_.wait_for(lock, timeout, [this] { return !tracks_.empty(); })) { + return nullptr; + } + return tracks_.front(); + } + +private: + std::mutex mutex_; + std::condition_variable cv_; + std::vector> tracks_; +}; + +DataFrame +readFrameWithTimeout(const std::shared_ptr &subscription, + std::chrono::milliseconds timeout) { + std::promise frame_promise; + auto future = frame_promise.get_future(); + + std::thread reader([subscription, + promise = std::move(frame_promise)]() mutable { + try { + DataFrame frame; + if (!subscription->read(frame)) { + throw std::runtime_error("Subscription ended before a frame arrived"); + } + promise.set_value(std::move(frame)); + } catch (...) { + promise.set_exception(std::current_exception()); + } + }); + + if (future.wait_for(timeout) != std::future_status::ready) { + subscription->close(); + } + + reader.join(); + return future.get(); +} + +} // namespace + +class DataTrackE2ETest : public LiveKitTestBase {}; + +class DataTrackTransportTest + : public DataTrackE2ETest, + public ::testing::WithParamInterface> {}; + +TEST_P(DataTrackTransportTest, PublishesAndReceivesFramesEndToEnd) { + const auto publish_fps = std::get<0>(GetParam()); + const auto payload_len = std::get<1>(GetParam()); + const auto track_name = makeTrackName("transport"); + const auto frame_count = static_cast(std::llround( + std::chrono::duration(kPublishDuration).count() * publish_fps)); + + DataTrackPublishedDelegate subscriber_delegate; + std::vector room_configs(2); + room_configs[1].delegate = &subscriber_delegate; + + auto rooms = testRooms(room_configs); + auto &publisher_room = rooms[0]; + const auto publisher_identity = + publisher_room->localParticipant()->identity(); + + std::exception_ptr publish_error; + std::thread publisher([&]() { + try { + auto track = + publisher_room->localParticipant()->publishDataTrack(track_name); + if (!track || !track->isPublished()) { + throw std::runtime_error("Publisher failed to publish data track"); + } + if (track->info().uses_e2ee) { + throw std::runtime_error("Unexpected E2EE on test data track"); + } + if (track->info().name != track_name) { + throw std::runtime_error("Published track name mismatch"); + } + + const auto frame_interval = + std::chrono::duration_cast( + std::chrono::duration(1.0 / publish_fps)); + auto next_send = std::chrono::steady_clock::now(); + + std::cout << "Publishing " << frame_count + << " frames with payload length " << payload_len << std::endl; + for (size_t index = 0; index < frame_count; ++index) { + std::vector payload(payload_len, + static_cast(index)); + if (!track->tryPush(payload)) { + throw std::runtime_error("Failed to push data frame"); + } + + next_send += frame_interval; + std::this_thread::sleep_until(next_send); + } + + track->unpublishDataTrack(); + } catch (...) { + publish_error = std::current_exception(); + } + }); + + auto remote_track = subscriber_delegate.waitForTrack(kTrackWaitTimeout); + ASSERT_NE(remote_track, nullptr) << "Timed out waiting for remote data track"; + EXPECT_TRUE(remote_track->isPublished()); + EXPECT_FALSE(remote_track->info().uses_e2ee); + EXPECT_EQ(remote_track->info().name, track_name); + EXPECT_EQ(remote_track->publisherIdentity(), publisher_identity); + + auto subscription = remote_track->subscribe(); + ASSERT_NE(subscription, nullptr); + + std::promise receive_count_promise; + auto receive_count_future = receive_count_promise.get_future(); + std::exception_ptr subscribe_error; + std::thread subscriber([&]() { + try { + size_t received_count = 0; + DataFrame frame; + while (subscription->read(frame) && received_count < frame_count) { + if (frame.payload.empty()) { + throw std::runtime_error("Received empty data frame"); + } + + const auto first_byte = frame.payload.front(); + if (!std::all_of(frame.payload.begin(), frame.payload.end(), + [first_byte](std::uint8_t byte) { + return byte == first_byte; + })) { + throw std::runtime_error("Received frame with inconsistent payload"); + } + if (frame.user_timestamp.has_value()) { + throw std::runtime_error( + "Received unexpected user timestamp in transport test"); + } + + ++received_count; + } + + receive_count_promise.set_value(received_count); + } catch (...) { + subscribe_error = std::current_exception(); + receive_count_promise.set_exception(std::current_exception()); + } + }); + + if (receive_count_future.wait_for(kReadTimeout) != + std::future_status::ready) { + subscription->close(); + ADD_FAILURE() << "Timed out waiting for data frames"; + } + + subscriber.join(); + publisher.join(); + + if (publish_error) { + std::rethrow_exception(publish_error); + } + if (subscribe_error) { + std::rethrow_exception(subscribe_error); + } + + const auto received_count = receive_count_future.get(); + const auto received_percent = + static_cast(received_count) / static_cast(frame_count); + std::cout << "Received " << received_count << "/" << frame_count + << " frames (" << received_percent * 100.0f << "%)" << std::endl; + + EXPECT_GE(received_percent, kMinimumReceivedPercent) + << "Received " << received_count << "/" << frame_count << " frames"; +} + +TEST_F(DataTrackE2ETest, UnpublishUpdatesPublishedStateEndToEnd) { + const auto track_name = makeTrackName("published_state"); + + DataTrackPublishedDelegate subscriber_delegate; + std::vector room_configs(2); + room_configs[1].delegate = &subscriber_delegate; + + auto rooms = testRooms(room_configs); + auto &publisher_room = rooms[0]; + + auto local_track = + publisher_room->localParticipant()->publishDataTrack(track_name); + ASSERT_NE(local_track, nullptr); + ASSERT_TRUE(local_track->isPublished()); + + auto remote_track = subscriber_delegate.waitForTrack(kTrackWaitTimeout); + ASSERT_NE(remote_track, nullptr) << "Timed out waiting for remote data track"; + EXPECT_TRUE(remote_track->isPublished()); + + std::this_thread::sleep_for(500ms); + local_track->unpublishDataTrack(); + + EXPECT_FALSE(local_track->isPublished()); + EXPECT_TRUE( + waitForCondition([&]() { return !remote_track->isPublished(); }, 2s)) + << "Remote track did not report unpublished state"; +} + +TEST_F(DataTrackE2ETest, PublishManyTracks) { + auto rooms = testRooms(1); + auto &room = rooms[0]; + + std::vector> tracks; + tracks.reserve(kPublishManyTrackCount); + + const auto start = std::chrono::steady_clock::now(); + for (int index = 0; index < kPublishManyTrackCount; ++index) { + const auto track_name = "track_" + std::to_string(index); + auto track = room->localParticipant()->publishDataTrack(track_name); + + ASSERT_NE(track, nullptr) << "Failed to publish track " << track_name; + EXPECT_TRUE(track->isPublished()) + << "Track was not published: " << track_name; + EXPECT_EQ(track->info().name, track_name); + + tracks.push_back(std::move(track)); + } + const auto elapsed = std::chrono::steady_clock::now() - start; + + std::cout + << "Publishing " << kPublishManyTrackCount << " tracks took " + << std::chrono::duration_cast(elapsed).count() + << " ms" << std::endl; + EXPECT_LT(elapsed, kPublishManyTimeout); + + // This test intentionally creates bursty data-track traffic by pushing a + // large frame on every published track in quick succession. The RTC sender + // path uses bounded queues, so under this load not every packet is expected + // to make it onto the transport and "Failed to enqueue data track packet" + // logs are expected. The purpose of this test is to verify publish/push + // behavior and local track state, not end-to-end delivery of every packet. + for (const auto &track : tracks) { + EXPECT_TRUE(track->tryPush( + std::vector(kLargeFramePayloadBytes, 0xFA))) + << "Failed to push large frame on track " << track->info().name; + std::this_thread::sleep_for(50ms); + } + + for (const auto &track : tracks) { + track->unpublishDataTrack(); + EXPECT_FALSE(track->isPublished()); + } +} + +TEST_F(DataTrackE2ETest, PublishDuplicateName) { + auto rooms = testRooms(1); + auto &room = rooms[0]; + + auto first_track = room->localParticipant()->publishDataTrack("first"); + ASSERT_NE(first_track, nullptr); + ASSERT_TRUE(first_track->isPublished()); + + try { + (void)room->localParticipant()->publishDataTrack("first"); + FAIL() << "Expected duplicate data-track name to be rejected"; + } catch (const std::runtime_error &error) { + const std::string message = error.what(); + EXPECT_FALSE(message.empty()); + } + + first_track->unpublishDataTrack(); +} + +TEST_F(DataTrackE2ETest, CanResubscribeToRemoteDataTrack) { + const auto track_name = makeTrackName("resubscribe"); + + DataTrackPublishedDelegate subscriber_delegate; + std::vector room_configs(2); + room_configs[1].delegate = &subscriber_delegate; + + auto rooms = testRooms(room_configs); + auto &publisher_room = rooms[0]; + + std::atomic keep_publishing{true}; + std::exception_ptr publish_error; + std::thread publisher([&]() { + try { + auto track = + publisher_room->localParticipant()->publishDataTrack(track_name); + if (!track || !track->isPublished()) { + throw std::runtime_error("Publisher failed to publish data track"); + } + + while (keep_publishing.load()) { + if (!track->tryPush(std::vector(64, 0xFA))) { + throw std::runtime_error("Failed to push resubscribe test frame"); + } + std::this_thread::sleep_for(50ms); + } + + track->unpublishDataTrack(); + } catch (...) { + publish_error = std::current_exception(); + } + }); + + auto remote_track = subscriber_delegate.waitForTrack(kTrackWaitTimeout); + ASSERT_NE(remote_track, nullptr) << "Timed out waiting for remote data track"; + + for (int iteration = 0; iteration < kResubscribeIterations; ++iteration) { + auto subscription = remote_track->subscribe(); + ASSERT_NE(subscription, nullptr); + + auto frame = readFrameWithTimeout(subscription, 5s); + EXPECT_FALSE(frame.payload.empty()) << "Iteration " << iteration; + + subscription->close(); + std::this_thread::sleep_for(50ms); + } + + keep_publishing.store(false); + publisher.join(); + + if (publish_error) { + std::rethrow_exception(publish_error); + } +} + +TEST_F(DataTrackE2ETest, PreservesUserTimestampEndToEnd) { + const auto track_name = makeTrackName("user_timestamp"); + const auto sent_timestamp = getTimestampUs(); + + DataTrackPublishedDelegate subscriber_delegate; + std::vector room_configs(2); + room_configs[1].delegate = &subscriber_delegate; + + auto rooms = testRooms(room_configs); + auto &publisher_room = rooms[0]; + + auto local_track = + publisher_room->localParticipant()->publishDataTrack(track_name); + ASSERT_NE(local_track, nullptr); + ASSERT_TRUE(local_track->isPublished()); + + auto remote_track = subscriber_delegate.waitForTrack(kTrackWaitTimeout); + ASSERT_NE(remote_track, nullptr) << "Timed out waiting for remote data track"; + + auto subscription = remote_track->subscribe(); + ASSERT_NE(subscription, nullptr); + + std::promise frame_promise; + auto frame_future = frame_promise.get_future(); + std::thread reader([&]() { + try { + DataFrame frame; + if (!subscription->read(frame)) { + throw std::runtime_error( + "Subscription ended before timestamped frame arrived"); + } + frame_promise.set_value(std::move(frame)); + } catch (...) { + frame_promise.set_exception(std::current_exception()); + } + }); + + const bool push_ok = + local_track->tryPush(std::vector(64, 0xFA), sent_timestamp); + const auto frame_status = frame_future.wait_for(5s); + + if (frame_status != std::future_status::ready) { + subscription->close(); + } + + subscription->close(); + reader.join(); + local_track->unpublishDataTrack(); + + ASSERT_TRUE(push_ok) << "Failed to push timestamped data frame"; + ASSERT_EQ(frame_status, std::future_status::ready) + << "Timed out waiting for timestamped frame"; + + DataFrame frame; + try { + frame = frame_future.get(); + } catch (const std::exception &e) { + FAIL() << e.what(); + } + + ASSERT_FALSE(frame.payload.empty()); + ASSERT_TRUE(frame.user_timestamp.has_value()); + EXPECT_EQ(frame.user_timestamp.value(), sent_timestamp); +} + +TEST_F(DataTrackE2ETest, PublishesAndReceivesEncryptedFramesEndToEnd) { + const auto track_name = makeTrackName("e2ee_transport"); + + DataTrackPublishedDelegate subscriber_delegate; + auto room_configs = encryptedRoomConfigs(&subscriber_delegate); + auto rooms = testRooms(room_configs); + auto &publisher_room = rooms[0]; + auto &subscriber_room = rooms[1]; + + ASSERT_NE(publisher_room->e2eeManager(), nullptr); + ASSERT_NE(subscriber_room->e2eeManager(), nullptr); + ASSERT_NE(publisher_room->e2eeManager()->keyProvider(), nullptr); + ASSERT_NE(subscriber_room->e2eeManager()->keyProvider(), nullptr); + publisher_room->e2eeManager()->setEnabled(true); + subscriber_room->e2eeManager()->setEnabled(true); + EXPECT_EQ(publisher_room->e2eeManager()->keyProvider()->exportSharedKey(), + e2eeSharedKey()); + EXPECT_EQ(subscriber_room->e2eeManager()->keyProvider()->exportSharedKey(), + e2eeSharedKey()); + + auto local_track = + publisher_room->localParticipant()->publishDataTrack(track_name); + ASSERT_NE(local_track, nullptr); + ASSERT_TRUE(local_track->isPublished()); + EXPECT_TRUE(local_track->info().uses_e2ee); + + auto remote_track = subscriber_delegate.waitForTrack(kTrackWaitTimeout); + ASSERT_NE(remote_track, nullptr) << "Timed out waiting for remote data track"; + EXPECT_TRUE(remote_track->isPublished()); + EXPECT_TRUE(remote_track->info().uses_e2ee); + EXPECT_EQ(remote_track->info().name, track_name); + + auto subscription = remote_track->subscribe(); + ASSERT_NE(subscription, nullptr); + + std::promise frame_promise; + auto frame_future = frame_promise.get_future(); + std::thread reader([&]() { + try { + DataFrame frame; + if (!subscription->read(frame)) { + throw std::runtime_error( + "Subscription ended before an encrypted frame arrived"); + } + frame_promise.set_value(std::move(frame)); + } catch (...) { + frame_promise.set_exception(std::current_exception()); + } + }); + + bool pushed = false; + for (int index = 0; index < 200; ++index) { + std::vector payload(kLargeFramePayloadBytes, + static_cast(index + 1)); + pushed = local_track->tryPush(payload) || pushed; + if (frame_future.wait_for(25ms) == std::future_status::ready) { + break; + } + } + + const auto frame_status = frame_future.wait_for(5s); + if (frame_status != std::future_status::ready) { + subscription->close(); + } + reader.join(); + ASSERT_TRUE(pushed) << "Failed to push encrypted data frames"; + ASSERT_EQ(frame_status, std::future_status::ready) + << "Timed out waiting for encrypted frame delivery"; + + DataFrame frame; + try { + frame = frame_future.get(); + } catch (const std::exception &e) { + FAIL() << e.what(); + } + ASSERT_FALSE(frame.payload.empty()); + const auto first_byte = frame.payload.front(); + EXPECT_TRUE(std::all_of( + frame.payload.begin(), frame.payload.end(), + [first_byte](std::uint8_t byte) { return byte == first_byte; })) + << "Encrypted payload is not byte-consistent"; + EXPECT_FALSE(frame.user_timestamp.has_value()) + << "Unexpected user timestamp on encrypted frame"; + + subscription->close(); + local_track->unpublishDataTrack(); +} + +TEST_F(DataTrackE2ETest, PreservesUserTimestampOnEncryptedDataTrack) { + const auto track_name = makeTrackName("e2ee_user_timestamp"); + const auto sent_timestamp = getTimestampUs(); + const std::vector payload(64, 0xFA); + + DataTrackPublishedDelegate subscriber_delegate; + auto room_configs = encryptedRoomConfigs(&subscriber_delegate); + auto rooms = testRooms(room_configs); + auto &publisher_room = rooms[0]; + auto &subscriber_room = rooms[1]; + + ASSERT_NE(publisher_room->e2eeManager(), nullptr); + ASSERT_NE(subscriber_room->e2eeManager(), nullptr); + publisher_room->e2eeManager()->setEnabled(true); + subscriber_room->e2eeManager()->setEnabled(true); + + auto local_track = + publisher_room->localParticipant()->publishDataTrack(track_name); + ASSERT_NE(local_track, nullptr); + ASSERT_TRUE(local_track->isPublished()); + EXPECT_TRUE(local_track->info().uses_e2ee); + + auto remote_track = subscriber_delegate.waitForTrack(kTrackWaitTimeout); + ASSERT_NE(remote_track, nullptr) << "Timed out waiting for remote data track"; + EXPECT_TRUE(remote_track->info().uses_e2ee); + + auto subscription = remote_track->subscribe(); + ASSERT_NE(subscription, nullptr); + + std::promise frame_promise; + auto frame_future = frame_promise.get_future(); + std::thread reader([&]() { + try { + DataFrame incoming_frame; + if (!subscription->read(incoming_frame)) { + throw std::runtime_error( + "Subscription ended before timestamped encrypted frame arrived"); + } + frame_promise.set_value(std::move(incoming_frame)); + } catch (...) { + frame_promise.set_exception(std::current_exception()); + } + }); + + bool pushed = false; + for (int attempt = 0; attempt < 200; ++attempt) { + pushed = local_track->tryPush(payload, sent_timestamp) || pushed; + if (frame_future.wait_for(25ms) == std::future_status::ready) { + break; + } + } + const auto frame_status = frame_future.wait_for(5s); + if (frame_status != std::future_status::ready) { + subscription->close(); + } + + reader.join(); + ASSERT_TRUE(pushed) << "Failed to push timestamped encrypted frame"; + ASSERT_EQ(frame_status, std::future_status::ready) + << "Timed out waiting for timestamped encrypted frame"; + + DataFrame frame; + try { + frame = frame_future.get(); + } catch (const std::exception &e) { + FAIL() << e.what(); + } + EXPECT_EQ(frame.payload, payload); + ASSERT_TRUE(frame.user_timestamp.has_value()); + EXPECT_EQ(frame.user_timestamp.value(), sent_timestamp); + + subscription->close(); + local_track->unpublishDataTrack(); +} + +std::string dataTrackParamName( + const ::testing::TestParamInfo> &info) { + if (std::get<0>(info.param) > 100.0) { + return "HighFpsSinglePacket"; + } + return "LowFpsMultiPacket"; +} + +INSTANTIATE_TEST_SUITE_P(DataTrackScenarios, DataTrackTransportTest, + ::testing::Values(std::make_tuple(120.0, size_t{8192}), + std::make_tuple(10.0, + size_t{196608})), + dataTrackParamName); + +} // namespace test +} // namespace livekit diff --git a/src/tests/integration/test_room_callbacks.cpp b/src/tests/integration/test_room_callbacks.cpp new file mode 100644 index 00000000..a15a151d --- /dev/null +++ b/src/tests/integration/test_room_callbacks.cpp @@ -0,0 +1,272 @@ +/* + * Copyright 2025 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/// @file test_room_callbacks.cpp +/// @brief Public API tests for Room callback registration. + +#include +#include + +#include +#include +#include +#include + +namespace livekit { + +class RoomCallbackTest : public ::testing::Test { +protected: + void SetUp() override { + livekit::initialize(livekit::LogLevel::Info, livekit::LogSink::kConsole); + } + + void TearDown() override { livekit::shutdown(); } +}; + +TEST_F(RoomCallbackTest, AudioCallbackRegistrationIsAccepted) { + Room room; + + EXPECT_NO_THROW(room.setOnAudioFrameCallback( + "alice", TrackSource::SOURCE_MICROPHONE, [](const AudioFrame &) {})); +} + +TEST_F(RoomCallbackTest, VideoCallbackRegistrationIsAccepted) { + Room room; + + EXPECT_NO_THROW( + room.setOnVideoFrameCallback("alice", TrackSource::SOURCE_CAMERA, + [](const VideoFrame &, std::int64_t) {})); +} + +TEST_F(RoomCallbackTest, AudioCallbackRegistrationByTrackNameIsAccepted) { + Room room; + + EXPECT_NO_THROW(room.setOnAudioFrameCallback( + "alice", "mic-main", [](const AudioFrame &) {})); +} + +TEST_F(RoomCallbackTest, VideoCallbackRegistrationByTrackNameIsAccepted) { + Room room; + + EXPECT_NO_THROW(room.setOnVideoFrameCallback( + "alice", "cam-main", [](const VideoFrame &, std::int64_t) {})); +} + +TEST_F(RoomCallbackTest, ClearingMissingCallbacksIsNoOp) { + Room room; + + EXPECT_NO_THROW( + room.clearOnAudioFrameCallback("nobody", TrackSource::SOURCE_MICROPHONE)); + EXPECT_NO_THROW( + room.clearOnVideoFrameCallback("nobody", TrackSource::SOURCE_CAMERA)); + EXPECT_NO_THROW(room.clearOnAudioFrameCallback("nobody", "missing-audio")); + EXPECT_NO_THROW(room.clearOnVideoFrameCallback("nobody", "missing-video")); +} + +TEST_F(RoomCallbackTest, ReRegisteringSameAudioKeyDoesNotThrow) { + Room room; + std::atomic counter1{0}; + std::atomic counter2{0}; + + EXPECT_NO_THROW(room.setOnAudioFrameCallback( + "alice", TrackSource::SOURCE_MICROPHONE, + [&counter1](const AudioFrame &) { counter1++; })); + EXPECT_NO_THROW(room.setOnAudioFrameCallback( + "alice", TrackSource::SOURCE_MICROPHONE, + [&counter2](const AudioFrame &) { counter2++; })); +} + +TEST_F(RoomCallbackTest, ReRegisteringSameVideoKeyDoesNotThrow) { + Room room; + + EXPECT_NO_THROW( + room.setOnVideoFrameCallback("alice", TrackSource::SOURCE_CAMERA, + [](const VideoFrame &, std::int64_t) {})); + EXPECT_NO_THROW( + room.setOnVideoFrameCallback("alice", TrackSource::SOURCE_CAMERA, + [](const VideoFrame &, std::int64_t) {})); +} + +TEST_F(RoomCallbackTest, DistinctAudioAndVideoCallbacksCanCoexist) { + Room room; + + EXPECT_NO_THROW(room.setOnAudioFrameCallback( + "alice", TrackSource::SOURCE_MICROPHONE, [](const AudioFrame &) {})); + EXPECT_NO_THROW( + room.setOnVideoFrameCallback("alice", TrackSource::SOURCE_CAMERA, + [](const VideoFrame &, std::int64_t) {})); + EXPECT_NO_THROW(room.setOnAudioFrameCallback( + "bob", TrackSource::SOURCE_MICROPHONE, [](const AudioFrame &) {})); + EXPECT_NO_THROW( + room.setOnVideoFrameCallback("bob", TrackSource::SOURCE_CAMERA, + [](const VideoFrame &, std::int64_t) {})); +} + +TEST_F(RoomCallbackTest, SameSourceDifferentTrackNamesAreAccepted) { + Room room; + + EXPECT_NO_THROW( + room.setOnVideoFrameCallback("alice", "cam-main", + [](const VideoFrame &, std::int64_t) {})); + EXPECT_NO_THROW( + room.setOnVideoFrameCallback("alice", "cam-backup", + [](const VideoFrame &, std::int64_t) {})); +} + +TEST_F(RoomCallbackTest, ClearingTrackNameCallbackIsAccepted) { + Room room; + + EXPECT_NO_THROW(room.setOnAudioFrameCallback( + "alice", "mic-main", [](const AudioFrame &) {})); + EXPECT_NO_THROW(room.clearOnAudioFrameCallback("alice", "mic-main")); +} + +TEST_F(RoomCallbackTest, SourceAndTrackNameCallbacksCanCoexist) { + Room room; + + EXPECT_NO_THROW(room.setOnAudioFrameCallback( + "alice", TrackSource::SOURCE_MICROPHONE, [](const AudioFrame &) {})); + EXPECT_NO_THROW(room.setOnAudioFrameCallback( + "alice", "mic-main", [](const AudioFrame &) {})); +} + +TEST_F(RoomCallbackTest, DataCallbackRegistrationReturnsUsableIds) { + Room room; + + const auto id1 = room.addOnDataFrameCallback( + "alice", "track-a", + [](const std::vector &, std::optional) {}); + const auto id2 = room.addOnDataFrameCallback( + "alice", "track-a", + [](const std::vector &, std::optional) {}); + + EXPECT_NE(id1, std::numeric_limits::max()); + EXPECT_NE(id2, std::numeric_limits::max()); + EXPECT_NE(id1, id2); + + EXPECT_NO_THROW(room.removeOnDataFrameCallback(id1)); + EXPECT_NO_THROW(room.removeOnDataFrameCallback(id2)); +} + +TEST_F(RoomCallbackTest, RemovingUnknownDataCallbackIsNoOp) { + Room room; + + EXPECT_NO_THROW(room.removeOnDataFrameCallback( + std::numeric_limits::max())); +} + +TEST_F(RoomCallbackTest, DestroyRoomWithRegisteredCallbacksIsSafe) { + EXPECT_NO_THROW({ + Room room; + room.setOnAudioFrameCallback("alice", TrackSource::SOURCE_MICROPHONE, + [](const AudioFrame &) {}); + room.setOnVideoFrameCallback("bob", TrackSource::SOURCE_CAMERA, + [](const VideoFrame &, std::int64_t) {}); + room.addOnDataFrameCallback( + "carol", "track", + [](const std::vector &, std::optional) {}); + }); +} + +TEST_F(RoomCallbackTest, DestroyRoomAfterClearingCallbacksIsSafe) { + EXPECT_NO_THROW({ + Room room; + room.setOnAudioFrameCallback("alice", TrackSource::SOURCE_MICROPHONE, + [](const AudioFrame &) {}); + room.clearOnAudioFrameCallback("alice", TrackSource::SOURCE_MICROPHONE); + + const auto id = room.addOnDataFrameCallback( + "alice", "track", + [](const std::vector &, std::optional) {}); + room.removeOnDataFrameCallback(id); + }); +} + +TEST_F(RoomCallbackTest, ConcurrentRegistrationDoesNotCrash) { + Room room; + constexpr int kThreads = 8; + constexpr int kIterations = 100; + + std::vector threads; + threads.reserve(kThreads); + + for (int t = 0; t < kThreads; ++t) { + threads.emplace_back([&room, t]() { + for (int i = 0; i < kIterations; ++i) { + const std::string id = "participant-" + std::to_string(t); + room.setOnAudioFrameCallback(id, TrackSource::SOURCE_MICROPHONE, + [](const AudioFrame &) {}); + room.clearOnAudioFrameCallback(id, TrackSource::SOURCE_MICROPHONE); + } + }); + } + + for (auto &thread : threads) { + thread.join(); + } + + SUCCEED(); +} + +TEST_F(RoomCallbackTest, ConcurrentMixedRegistrationDoesNotCrash) { + Room room; + constexpr int kThreads = 4; + constexpr int kIterations = 50; + + std::vector threads; + threads.reserve(kThreads); + + for (int t = 0; t < kThreads; ++t) { + threads.emplace_back([&room, t]() { + const std::string id = "p-" + std::to_string(t); + for (int i = 0; i < kIterations; ++i) { + room.setOnAudioFrameCallback(id, TrackSource::SOURCE_MICROPHONE, + [](const AudioFrame &) {}); + room.setOnVideoFrameCallback(id, TrackSource::SOURCE_CAMERA, + [](const VideoFrame &, std::int64_t) {}); + const auto data_id = + room.addOnDataFrameCallback(id, "track", + [](const std::vector &, + std::optional) {}); + room.removeOnDataFrameCallback(data_id); + } + }); + } + + for (auto &thread : threads) { + thread.join(); + } + + SUCCEED(); +} + +TEST_F(RoomCallbackTest, ManyDistinctAudioCallbacksCanBeRegisteredAndCleared) { + Room room; + constexpr int kCount = 50; + + for (int i = 0; i < kCount; ++i) { + EXPECT_NO_THROW(room.setOnAudioFrameCallback( + "participant-" + std::to_string(i), TrackSource::SOURCE_MICROPHONE, + [](const AudioFrame &) {})); + } + + for (int i = 0; i < kCount; ++i) { + EXPECT_NO_THROW(room.clearOnAudioFrameCallback( + "participant-" + std::to_string(i), TrackSource::SOURCE_MICROPHONE)); + } +} + +} // namespace livekit diff --git a/src/tests/integration/test_subscription_thread_dispatcher.cpp b/src/tests/integration/test_subscription_thread_dispatcher.cpp index 71601a18..8bdee46a 100644 --- a/src/tests/integration/test_subscription_thread_dispatcher.cpp +++ b/src/tests/integration/test_subscription_thread_dispatcher.cpp @@ -36,6 +36,8 @@ class SubscriptionThreadDispatcherTest : public ::testing::Test { using CallbackKey = SubscriptionThreadDispatcher::CallbackKey; using CallbackKeyHash = SubscriptionThreadDispatcher::CallbackKeyHash; + using DataCallbackKey = SubscriptionThreadDispatcher::DataCallbackKey; + using DataCallbackKeyHash = SubscriptionThreadDispatcher::DataCallbackKeyHash; static auto &audioCallbacks(SubscriptionThreadDispatcher &dispatcher) { return dispatcher.audio_callbacks_; @@ -46,6 +48,15 @@ class SubscriptionThreadDispatcherTest : public ::testing::Test { static auto &activeReaders(SubscriptionThreadDispatcher &dispatcher) { return dispatcher.active_readers_; } + static auto &dataCallbacks(SubscriptionThreadDispatcher &dispatcher) { + return dispatcher.data_callbacks_; + } + static auto &activeDataReaders(SubscriptionThreadDispatcher &dispatcher) { + return dispatcher.active_data_readers_; + } + static auto &remoteDataTracks(SubscriptionThreadDispatcher &dispatcher) { + return dispatcher.remote_data_tracks_; + } static int maxActiveReaders() { return SubscriptionThreadDispatcher::kMaxActiveReaders; } @@ -56,21 +67,27 @@ class SubscriptionThreadDispatcherTest : public ::testing::Test { // ============================================================================ TEST_F(SubscriptionThreadDispatcherTest, CallbackKeyEqualKeysCompareEqual) { - CallbackKey a{"alice", TrackSource::SOURCE_MICROPHONE}; - CallbackKey b{"alice", TrackSource::SOURCE_MICROPHONE}; + CallbackKey a{"alice", TrackSource::SOURCE_MICROPHONE, ""}; + CallbackKey b{"alice", TrackSource::SOURCE_MICROPHONE, ""}; EXPECT_TRUE(a == b); } TEST_F(SubscriptionThreadDispatcherTest, CallbackKeyDifferentIdentityNotEqual) { - CallbackKey a{"alice", TrackSource::SOURCE_MICROPHONE}; - CallbackKey b{"bob", TrackSource::SOURCE_MICROPHONE}; + CallbackKey a{"alice", TrackSource::SOURCE_MICROPHONE, ""}; + CallbackKey b{"bob", TrackSource::SOURCE_MICROPHONE, ""}; EXPECT_FALSE(a == b); } TEST_F(SubscriptionThreadDispatcherTest, CallbackKeyDifferentSourceNotEqual) { - CallbackKey a{"alice", TrackSource::SOURCE_MICROPHONE}; - CallbackKey b{"alice", TrackSource::SOURCE_CAMERA}; + CallbackKey a{"alice", TrackSource::SOURCE_MICROPHONE, ""}; + CallbackKey b{"alice", TrackSource::SOURCE_CAMERA, ""}; + EXPECT_FALSE(a == b); +} + +TEST_F(SubscriptionThreadDispatcherTest, CallbackKeyDifferentTrackNameNotEqual) { + CallbackKey a{"alice", TrackSource::SOURCE_UNKNOWN, "cam-main"}; + CallbackKey b{"alice", TrackSource::SOURCE_UNKNOWN, "cam-backup"}; EXPECT_FALSE(a == b); } @@ -80,8 +97,8 @@ TEST_F(SubscriptionThreadDispatcherTest, CallbackKeyDifferentSourceNotEqual) { TEST_F(SubscriptionThreadDispatcherTest, CallbackKeyHashEqualKeysProduceSameHash) { - CallbackKey a{"alice", TrackSource::SOURCE_MICROPHONE}; - CallbackKey b{"alice", TrackSource::SOURCE_MICROPHONE}; + CallbackKey a{"alice", TrackSource::SOURCE_MICROPHONE, ""}; + CallbackKey b{"alice", TrackSource::SOURCE_MICROPHONE, ""}; CallbackKeyHash hasher; EXPECT_EQ(hasher(a), hasher(b)); } @@ -89,20 +106,22 @@ TEST_F(SubscriptionThreadDispatcherTest, TEST_F(SubscriptionThreadDispatcherTest, CallbackKeyHashDifferentKeysLikelyDifferentHash) { CallbackKeyHash hasher; - CallbackKey mic{"alice", TrackSource::SOURCE_MICROPHONE}; - CallbackKey cam{"alice", TrackSource::SOURCE_CAMERA}; - CallbackKey bob{"bob", TrackSource::SOURCE_MICROPHONE}; + CallbackKey mic{"alice", TrackSource::SOURCE_MICROPHONE, ""}; + CallbackKey cam{"alice", TrackSource::SOURCE_CAMERA, ""}; + CallbackKey bob{"bob", TrackSource::SOURCE_MICROPHONE, ""}; + CallbackKey named{"alice", TrackSource::SOURCE_UNKNOWN, "mic-main"}; EXPECT_NE(hasher(mic), hasher(cam)); EXPECT_NE(hasher(mic), hasher(bob)); + EXPECT_NE(hasher(mic), hasher(named)); } TEST_F(SubscriptionThreadDispatcherTest, CallbackKeyWorksAsUnorderedMapKey) { std::unordered_map map; - CallbackKey k1{"alice", TrackSource::SOURCE_MICROPHONE}; - CallbackKey k2{"bob", TrackSource::SOURCE_CAMERA}; - CallbackKey k3{"alice", TrackSource::SOURCE_CAMERA}; + CallbackKey k1{"alice", TrackSource::SOURCE_MICROPHONE, ""}; + CallbackKey k2{"bob", TrackSource::SOURCE_CAMERA, ""}; + CallbackKey k3{"alice", TrackSource::SOURCE_CAMERA, ""}; map[k1] = 1; map[k2] = 2; @@ -123,8 +142,8 @@ TEST_F(SubscriptionThreadDispatcherTest, CallbackKeyWorksAsUnorderedMapKey) { } TEST_F(SubscriptionThreadDispatcherTest, CallbackKeyEmptyIdentityWorks) { - CallbackKey a{"", TrackSource::SOURCE_UNKNOWN}; - CallbackKey b{"", TrackSource::SOURCE_UNKNOWN}; + CallbackKey a{"", TrackSource::SOURCE_UNKNOWN, ""}; + CallbackKey b{"", TrackSource::SOURCE_UNKNOWN, ""}; CallbackKeyHash hasher; EXPECT_TRUE(a == b); EXPECT_EQ(hasher(a), hasher(b)); @@ -150,6 +169,18 @@ TEST_F(SubscriptionThreadDispatcherTest, SetAudioCallbackStoresRegistration) { EXPECT_EQ(audioCallbacks(dispatcher).size(), 1u); } +TEST_F(SubscriptionThreadDispatcherTest, + SetAudioCallbackByTrackNameStoresRegistration) { + SubscriptionThreadDispatcher dispatcher; + dispatcher.setOnAudioFrameCallback("alice", "mic-main", + [](const AudioFrame &) {}); + + EXPECT_EQ(audioCallbacks(dispatcher).size(), 1u); + EXPECT_EQ(audioCallbacks(dispatcher).count( + CallbackKey{"alice", TrackSource::SOURCE_UNKNOWN, "mic-main"}), + 1u); +} + TEST_F(SubscriptionThreadDispatcherTest, SetVideoCallbackStoresRegistration) { SubscriptionThreadDispatcher dispatcher; dispatcher.setOnVideoFrameCallback("alice", TrackSource::SOURCE_CAMERA, @@ -158,6 +189,18 @@ TEST_F(SubscriptionThreadDispatcherTest, SetVideoCallbackStoresRegistration) { EXPECT_EQ(videoCallbacks(dispatcher).size(), 1u); } +TEST_F(SubscriptionThreadDispatcherTest, + SetVideoCallbackByTrackNameStoresRegistration) { + SubscriptionThreadDispatcher dispatcher; + dispatcher.setOnVideoFrameCallback("alice", "cam-main", + [](const VideoFrame &, std::int64_t) {}); + + EXPECT_EQ(videoCallbacks(dispatcher).size(), 1u); + EXPECT_EQ(videoCallbacks(dispatcher).count( + CallbackKey{"alice", TrackSource::SOURCE_UNKNOWN, "cam-main"}), + 1u); +} + TEST_F(SubscriptionThreadDispatcherTest, ClearAudioCallbackRemovesRegistration) { SubscriptionThreadDispatcher dispatcher; dispatcher.setOnAudioFrameCallback("alice", TrackSource::SOURCE_MICROPHONE, @@ -168,6 +211,17 @@ TEST_F(SubscriptionThreadDispatcherTest, ClearAudioCallbackRemovesRegistration) EXPECT_EQ(audioCallbacks(dispatcher).size(), 0u); } +TEST_F(SubscriptionThreadDispatcherTest, + ClearAudioCallbackByTrackNameRemovesRegistration) { + SubscriptionThreadDispatcher dispatcher; + dispatcher.setOnAudioFrameCallback("alice", "mic-main", + [](const AudioFrame &) {}); + ASSERT_EQ(audioCallbacks(dispatcher).size(), 1u); + + dispatcher.clearOnAudioFrameCallback("alice", "mic-main"); + EXPECT_EQ(audioCallbacks(dispatcher).size(), 0u); +} + TEST_F(SubscriptionThreadDispatcherTest, ClearVideoCallbackRemovesRegistration) { SubscriptionThreadDispatcher dispatcher; dispatcher.setOnVideoFrameCallback("alice", TrackSource::SOURCE_CAMERA, @@ -178,12 +232,25 @@ TEST_F(SubscriptionThreadDispatcherTest, ClearVideoCallbackRemovesRegistration) EXPECT_EQ(videoCallbacks(dispatcher).size(), 0u); } +TEST_F(SubscriptionThreadDispatcherTest, + ClearVideoCallbackByTrackNameRemovesRegistration) { + SubscriptionThreadDispatcher dispatcher; + dispatcher.setOnVideoFrameCallback("alice", "cam-main", + [](const VideoFrame &, std::int64_t) {}); + ASSERT_EQ(videoCallbacks(dispatcher).size(), 1u); + + dispatcher.clearOnVideoFrameCallback("alice", "cam-main"); + EXPECT_EQ(videoCallbacks(dispatcher).size(), 0u); +} + TEST_F(SubscriptionThreadDispatcherTest, ClearNonExistentCallbackIsNoOp) { SubscriptionThreadDispatcher dispatcher; EXPECT_NO_THROW(dispatcher.clearOnAudioFrameCallback( "nobody", TrackSource::SOURCE_MICROPHONE)); EXPECT_NO_THROW( dispatcher.clearOnVideoFrameCallback("nobody", TrackSource::SOURCE_CAMERA)); + EXPECT_NO_THROW(dispatcher.clearOnAudioFrameCallback("nobody", "missing")); + EXPECT_NO_THROW(dispatcher.clearOnVideoFrameCallback("nobody", "missing")); } TEST_F(SubscriptionThreadDispatcherTest, OverwriteAudioCallbackKeepsSingleEntry) { @@ -212,6 +279,17 @@ TEST_F(SubscriptionThreadDispatcherTest, OverwriteVideoCallbackKeepsSingleEntry) EXPECT_EQ(videoCallbacks(dispatcher).size(), 1u); } +TEST_F(SubscriptionThreadDispatcherTest, + OverwriteTrackNameAudioCallbackKeepsSingleEntry) { + SubscriptionThreadDispatcher dispatcher; + dispatcher.setOnAudioFrameCallback("alice", "mic-main", + [](const AudioFrame &) {}); + dispatcher.setOnAudioFrameCallback("alice", "mic-main", + [](const AudioFrame &) {}); + + EXPECT_EQ(audioCallbacks(dispatcher).size(), 1u); +} + TEST_F(SubscriptionThreadDispatcherTest, MultipleDistinctCallbacksAreIndependent) { SubscriptionThreadDispatcher dispatcher; @@ -246,10 +324,26 @@ TEST_F(SubscriptionThreadDispatcherTest, ClearingOneSourceDoesNotAffectOther) { TrackSource::SOURCE_MICROPHONE); EXPECT_EQ(audioCallbacks(dispatcher).size(), 1u); - CallbackKey remaining{"alice", TrackSource::SOURCE_SCREENSHARE_AUDIO}; + CallbackKey remaining{"alice", TrackSource::SOURCE_SCREENSHARE_AUDIO, ""}; EXPECT_EQ(audioCallbacks(dispatcher).count(remaining), 1u); } +TEST_F(SubscriptionThreadDispatcherTest, + SourceAndTrackNameAudioCallbacksAreIndependent) { + SubscriptionThreadDispatcher dispatcher; + dispatcher.setOnAudioFrameCallback("alice", TrackSource::SOURCE_MICROPHONE, + [](const AudioFrame &) {}); + dispatcher.setOnAudioFrameCallback("alice", "mic-main", + [](const AudioFrame &) {}); + ASSERT_EQ(audioCallbacks(dispatcher).size(), 2u); + + dispatcher.clearOnAudioFrameCallback("alice", "mic-main"); + EXPECT_EQ(audioCallbacks(dispatcher).size(), 1u); + EXPECT_EQ(audioCallbacks(dispatcher).count( + CallbackKey{"alice", TrackSource::SOURCE_MICROPHONE, ""}), + 1u); +} + // ============================================================================ // Active readers state (no real streams, just map state) // ============================================================================ @@ -380,4 +474,264 @@ TEST_F(SubscriptionThreadDispatcherTest, ManyDistinctCallbacksCanBeRegistered) { EXPECT_EQ(audioCallbacks(dispatcher).size(), 0u); } +// ============================================================================ +// DataCallbackKey equality +// ============================================================================ + +TEST_F(SubscriptionThreadDispatcherTest, DataCallbackKeyEqualKeysCompareEqual) { + DataCallbackKey a{"alice", "my-track"}; + DataCallbackKey b{"alice", "my-track"}; + EXPECT_TRUE(a == b); +} + +TEST_F(SubscriptionThreadDispatcherTest, + DataCallbackKeyDifferentIdentityNotEqual) { + DataCallbackKey a{"alice", "my-track"}; + DataCallbackKey b{"bob", "my-track"}; + EXPECT_FALSE(a == b); +} + +TEST_F(SubscriptionThreadDispatcherTest, + DataCallbackKeyDifferentTrackNameNotEqual) { + DataCallbackKey a{"alice", "track-a"}; + DataCallbackKey b{"alice", "track-b"}; + EXPECT_FALSE(a == b); +} + +// ============================================================================ +// DataCallbackKeyHash +// ============================================================================ + +TEST_F(SubscriptionThreadDispatcherTest, + DataCallbackKeyHashEqualKeysProduceSameHash) { + DataCallbackKey a{"alice", "my-track"}; + DataCallbackKey b{"alice", "my-track"}; + DataCallbackKeyHash hasher; + EXPECT_EQ(hasher(a), hasher(b)); +} + +TEST_F(SubscriptionThreadDispatcherTest, + DataCallbackKeyHashDifferentKeysLikelyDifferentHash) { + DataCallbackKeyHash hasher; + DataCallbackKey a{"alice", "track-a"}; + DataCallbackKey b{"alice", "track-b"}; + DataCallbackKey c{"bob", "track-a"}; + EXPECT_NE(hasher(a), hasher(b)); + EXPECT_NE(hasher(a), hasher(c)); +} + +TEST_F(SubscriptionThreadDispatcherTest, + DataCallbackKeyWorksAsUnorderedMapKey) { + std::unordered_map map; + + DataCallbackKey k1{"alice", "track-a"}; + DataCallbackKey k2{"bob", "track-b"}; + DataCallbackKey k3{"alice", "track-b"}; + + map[k1] = 1; + map[k2] = 2; + map[k3] = 3; + + EXPECT_EQ(map.size(), 3u); + EXPECT_EQ(map[k1], 1); + EXPECT_EQ(map[k2], 2); + EXPECT_EQ(map[k3], 3); + + map[k1] = 42; + EXPECT_EQ(map[k1], 42); + EXPECT_EQ(map.size(), 3u); + + map.erase(k2); + EXPECT_EQ(map.size(), 2u); + EXPECT_EQ(map.count(k2), 0u); +} + +// ============================================================================ +// Data callback registration and clearing +// ============================================================================ + +TEST_F(SubscriptionThreadDispatcherTest, + AddDataFrameCallbackStoresRegistration) { + SubscriptionThreadDispatcher dispatcher; + auto id = dispatcher.addOnDataFrameCallback( + "alice", "my-track", + [](const std::vector &, std::optional) {}); + + EXPECT_NE(id, 0u); + EXPECT_EQ(dataCallbacks(dispatcher).size(), 1u); +} + +TEST_F(SubscriptionThreadDispatcherTest, + RemoveDataFrameCallbackRemovesRegistration) { + SubscriptionThreadDispatcher dispatcher; + auto id = dispatcher.addOnDataFrameCallback( + "alice", "my-track", + [](const std::vector &, std::optional) {}); + ASSERT_EQ(dataCallbacks(dispatcher).size(), 1u); + + dispatcher.removeOnDataFrameCallback(id); + EXPECT_EQ(dataCallbacks(dispatcher).size(), 0u); +} + +TEST_F(SubscriptionThreadDispatcherTest, + RemoveNonExistentDataCallbackIsNoOp) { + SubscriptionThreadDispatcher dispatcher; + EXPECT_NO_THROW(dispatcher.removeOnDataFrameCallback(999)); +} + +TEST_F(SubscriptionThreadDispatcherTest, + MultipleDataCallbacksForSameKeyAreIndependent) { + SubscriptionThreadDispatcher dispatcher; + auto cb = [](const std::vector &, + std::optional) {}; + auto id1 = dispatcher.addOnDataFrameCallback("alice", "track", cb); + auto id2 = dispatcher.addOnDataFrameCallback("alice", "track", cb); + + EXPECT_NE(id1, id2); + EXPECT_EQ(dataCallbacks(dispatcher).size(), 2u); + + dispatcher.removeOnDataFrameCallback(id1); + EXPECT_EQ(dataCallbacks(dispatcher).size(), 1u); +} + +TEST_F(SubscriptionThreadDispatcherTest, + DataCallbackIdsAreMonotonicallyIncreasing) { + SubscriptionThreadDispatcher dispatcher; + auto cb = [](const std::vector &, + std::optional) {}; + auto id1 = dispatcher.addOnDataFrameCallback("alice", "t1", cb); + auto id2 = dispatcher.addOnDataFrameCallback("bob", "t2", cb); + auto id3 = dispatcher.addOnDataFrameCallback("carol", "t3", cb); + + EXPECT_LT(id1, id2); + EXPECT_LT(id2, id3); +} + +// ============================================================================ +// Data track active readers (no real tracks, just map state) +// ============================================================================ + +TEST_F(SubscriptionThreadDispatcherTest, NoActiveDataReadersInitially) { + SubscriptionThreadDispatcher dispatcher; + EXPECT_TRUE(activeDataReaders(dispatcher).empty()); +} + +TEST_F(SubscriptionThreadDispatcherTest, + ActiveDataReadersEmptyAfterCallbackRegistration) { + SubscriptionThreadDispatcher dispatcher; + dispatcher.addOnDataFrameCallback( + "alice", "my-track", + [](const std::vector &, std::optional) {}); + EXPECT_TRUE(activeDataReaders(dispatcher).empty()) + << "Registering a callback without a published track should not spawn " + "readers"; +} + +TEST_F(SubscriptionThreadDispatcherTest, NoRemoteDataTracksInitially) { + SubscriptionThreadDispatcher dispatcher; + EXPECT_TRUE(remoteDataTracks(dispatcher).empty()); +} + +// ============================================================================ +// Data track destruction safety +// ============================================================================ + +TEST_F(SubscriptionThreadDispatcherTest, + DestroyDispatcherWithDataCallbacksIsSafe) { + EXPECT_NO_THROW({ + SubscriptionThreadDispatcher dispatcher; + dispatcher.addOnDataFrameCallback( + "alice", "track-a", + [](const std::vector &, + std::optional) {}); + dispatcher.addOnDataFrameCallback( + "bob", "track-b", + [](const std::vector &, + std::optional) {}); + }); +} + +TEST_F(SubscriptionThreadDispatcherTest, + DestroyDispatcherAfterRemovingDataCallbacksIsSafe) { + EXPECT_NO_THROW({ + SubscriptionThreadDispatcher dispatcher; + auto id = dispatcher.addOnDataFrameCallback( + "alice", "track-a", + [](const std::vector &, + std::optional) {}); + dispatcher.removeOnDataFrameCallback(id); + }); +} + +// ============================================================================ +// Mixed audio/video/data registration +// ============================================================================ + +TEST_F(SubscriptionThreadDispatcherTest, + MixedAudioVideoDataCallbacksAreIndependent) { + SubscriptionThreadDispatcher dispatcher; + dispatcher.setOnAudioFrameCallback("alice", TrackSource::SOURCE_MICROPHONE, + [](const AudioFrame &) {}); + dispatcher.setOnVideoFrameCallback("alice", TrackSource::SOURCE_CAMERA, + [](const VideoFrame &, std::int64_t) {}); + dispatcher.addOnDataFrameCallback( + "alice", "data-track", + [](const std::vector &, std::optional) {}); + + EXPECT_EQ(audioCallbacks(dispatcher).size(), 1u); + EXPECT_EQ(videoCallbacks(dispatcher).size(), 1u); + EXPECT_EQ(dataCallbacks(dispatcher).size(), 1u); +} + +TEST_F(SubscriptionThreadDispatcherTest, + StopAllClearsDataCallbacksAndReaders) { + SubscriptionThreadDispatcher dispatcher; + dispatcher.addOnDataFrameCallback( + "alice", "track-a", + [](const std::vector &, std::optional) {}); + dispatcher.addOnDataFrameCallback( + "bob", "track-b", + [](const std::vector &, std::optional) {}); + + dispatcher.stopAll(); + + EXPECT_EQ(dataCallbacks(dispatcher).size(), 0u); + EXPECT_TRUE(activeDataReaders(dispatcher).empty()); + EXPECT_TRUE(remoteDataTracks(dispatcher).empty()); +} + +// ============================================================================ +// Concurrent data callback registration +// ============================================================================ + +TEST_F(SubscriptionThreadDispatcherTest, + ConcurrentDataCallbackRegistrationDoesNotCrash) { + SubscriptionThreadDispatcher dispatcher; + constexpr int kThreads = 8; + constexpr int kIterations = 100; + + std::vector threads; + threads.reserve(kThreads); + + for (int t = 0; t < kThreads; ++t) { + threads.emplace_back([&dispatcher, t]() { + for (int i = 0; i < kIterations; ++i) { + auto id = dispatcher.addOnDataFrameCallback( + "participant-" + std::to_string(t), "track", + [](const std::vector &, + std::optional) {}); + dispatcher.removeOnDataFrameCallback(id); + } + }); + } + + for (auto &thread : threads) { + thread.join(); + } + + EXPECT_TRUE(dataCallbacks(dispatcher).empty()) + << "All data callbacks should be cleared after concurrent " + "register/remove"; +} + } // namespace livekit diff --git a/src/tests/stress/test_latency_measurement.cpp b/src/tests/stress/test_latency_measurement.cpp index ed988b79..c60e9680 100644 --- a/src/tests/stress/test_latency_measurement.cpp +++ b/src/tests/stress/test_latency_measurement.cpp @@ -343,6 +343,9 @@ TEST_F(LatencyMeasurementTest, AudioLatency) { } // Clean up + ASSERT_NE(audio_track, nullptr) << "Audio track is null"; + ASSERT_NE(audio_track->publication(), nullptr) + << "Audio track publication is null"; sender_room->localParticipant()->unpublishTrack( audio_track->publication()->sid());
LiveKit Ecosystem