From 7d1f10fe2f0e65fece13a1d8e64aeeeeda09e67e Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Thu, 26 Mar 2026 12:14:27 +0100 Subject: [PATCH] DPL: make sure DataRelayer benchmark works again --- Framework/Core/test/benchmark_DataRelayer.cxx | 68 +++++++++++++++---- 1 file changed, 53 insertions(+), 15 deletions(-) diff --git a/Framework/Core/test/benchmark_DataRelayer.cxx b/Framework/Core/test/benchmark_DataRelayer.cxx index e983f3604cfab..312711d73e95e 100644 --- a/Framework/Core/test/benchmark_DataRelayer.cxx +++ b/Framework/Core/test/benchmark_DataRelayer.cxx @@ -15,10 +15,17 @@ #include "Framework/CompletionPolicyHelpers.h" #include "Framework/DataRelayer.h" #include "Framework/DataProcessingHeader.h" +#include "Framework/DataProcessingStates.h" +#include "Framework/DataProcessingStats.h" +#include "Framework/DeviceState.h" +#include "Framework/DriverConfig.h" +#include "Framework/ServiceRegistryHelpers.h" +#include "Framework/TimingHelpers.h" #include #include #include #include +#include using Monitoring = o2::monitoring::Monitoring; using namespace o2::framework; @@ -26,6 +33,42 @@ using DataHeader = o2::header::DataHeader; using Stack = o2::header::Stack; using RecordAction = o2::framework::DataRelayer::RecordAction; +struct BenchmarkServices { + Monitoring monitoring; + const DriverConfig driverConfig{.batch = false}; + DataProcessingStates states{ + TimingHelpers::defaultRealtimeBaseConfigurator(0, uv_default_loop()), + TimingHelpers::defaultCPUTimeConfigurator(uv_default_loop())}; + DataProcessingStats stats{ + TimingHelpers::defaultRealtimeBaseConfigurator(0, uv_default_loop()), + TimingHelpers::defaultCPUTimeConfigurator(uv_default_loop()), + {}}; + DeviceState deviceState; + ServiceRegistry registry; + + ServiceRegistryRef ref() + { + using MetricSpec = DataProcessingStats::MetricSpec; + int quickUpdateInterval = 1; + std::vector specs{ + MetricSpec{.name = "malformed_inputs", .metricId = static_cast(ProcessingStatsId::MALFORMED_INPUTS), .minPublishInterval = quickUpdateInterval}, + MetricSpec{.name = "dropped_computations", .metricId = static_cast(ProcessingStatsId::DROPPED_COMPUTATIONS), .minPublishInterval = quickUpdateInterval}, + MetricSpec{.name = "dropped_incoming_messages", .metricId = static_cast(ProcessingStatsId::DROPPED_INCOMING_MESSAGES), .minPublishInterval = quickUpdateInterval}, + MetricSpec{.name = "relayed_messages", .metricId = static_cast(ProcessingStatsId::RELAYED_MESSAGES), .minPublishInterval = quickUpdateInterval}}; + for (auto& spec : specs) { + stats.registerMetric(spec); + } + + ServiceRegistryRef r{registry}; + r.registerService(ServiceRegistryHelpers::handleForService(&monitoring)); + r.registerService(ServiceRegistryHelpers::handleForService(&states)); + r.registerService(ServiceRegistryHelpers::handleForService(&stats)); + r.registerService(ServiceRegistryHelpers::handleForService(&driverConfig)); + r.registerService(ServiceRegistryHelpers::handleForService(&deviceState)); + return r; + } +}; + // a simple benchmark of the contribution of the pure message creation // this was important when the benchmarks below included the message // creation inside the benchmark loop, its somewhat obsolete now but @@ -54,7 +97,7 @@ BENCHMARK(BM_RelayMessageCreation); // and the subsequent InputRecord is immediately requested. static void BM_RelaySingleSlot(benchmark::State& state) { - Monitoring metrics; + BenchmarkServices services; InputSpec spec{"clusters", "TPC", "CLUSTERS"}; std::vector inputs = { @@ -64,8 +107,7 @@ static void BM_RelaySingleSlot(benchmark::State& state) std::vector infos{1}; TimesliceIndex index{1, infos}; auto policy = CompletionPolicyHelpers::consumeWhenAny(); - ServiceRegistry registry; - DataRelayer relayer(policy, inputs, index, {registry}, -1); + DataRelayer relayer(policy, inputs, index, services.ref(), -1); relayer.setPipelineLength(4); // Let's create a dummy O2 Message with two headers in the stack: @@ -106,7 +148,7 @@ BENCHMARK(BM_RelaySingleSlot); // This one will simulate a single input. static void BM_RelayMultipleSlots(benchmark::State& state) { - Monitoring metrics; + BenchmarkServices services; InputSpec spec{"clusters", "TPC", "CLUSTERS"}; std::vector inputs = { @@ -117,8 +159,7 @@ static void BM_RelayMultipleSlots(benchmark::State& state) TimesliceIndex index{1, infos}; auto policy = CompletionPolicyHelpers::consumeWhenAny(); - ServiceRegistry registry; - DataRelayer relayer(policy, inputs, index, {registry}, -1); + DataRelayer relayer(policy, inputs, index, services.ref(), -1); relayer.setPipelineLength(4); // Let's create a dummy O2 Message with two headers in the stack: @@ -163,7 +204,7 @@ BENCHMARK(BM_RelayMultipleSlots); /// In this case we have a record with two entries static void BM_RelayMultipleRoutes(benchmark::State& state) { - Monitoring metrics; + BenchmarkServices services; InputSpec spec1{"clusters", "TPC", "CLUSTERS"}; InputSpec spec2{"tracks", "TPC", "TRACKS"}; @@ -176,8 +217,7 @@ static void BM_RelayMultipleRoutes(benchmark::State& state) TimesliceIndex index{1, infos}; auto policy = CompletionPolicyHelpers::consumeWhenAny(); - ServiceRegistry registry; - DataRelayer relayer(policy, inputs, index, {registry}, -1); + DataRelayer relayer(policy, inputs, index, services.ref(), -1); relayer.setPipelineLength(4); // Let's create a dummy O2 Message with two headers in the stack: @@ -241,7 +281,7 @@ BENCHMARK(BM_RelayMultipleRoutes); /// In this case we have a record with two entries static void BM_RelaySplitParts(benchmark::State& state) { - Monitoring metrics; + BenchmarkServices services; InputSpec spec1{"clusters", "TPC", "CLUSTERS"}; std::vector inputs = { @@ -253,8 +293,7 @@ static void BM_RelaySplitParts(benchmark::State& state) TimesliceIndex index{1, infos}; auto policy = CompletionPolicyHelpers::consumeWhenAny(); - ServiceRegistry registry; - DataRelayer relayer(policy, inputs, index, {registry}, -1); + DataRelayer relayer(policy, inputs, index, services.ref(), -1); relayer.setPipelineLength(4); // Let's create a dummy O2 Message with two headers in the stack: @@ -301,7 +340,7 @@ BENCHMARK(BM_RelaySplitParts)->Arg(10)->Arg(100)->Arg(1000); static void BM_RelayMultiplePayloads(benchmark::State& state) { - Monitoring metrics; + BenchmarkServices services; InputSpec spec1{"clusters", "TPC", "CLUSTERS"}; std::vector inputs = { @@ -313,8 +352,7 @@ static void BM_RelayMultiplePayloads(benchmark::State& state) TimesliceIndex index{1, infos}; auto policy = CompletionPolicyHelpers::consumeWhenAny(); - ServiceRegistry registry; - DataRelayer relayer(policy, inputs, index, {registry}, -1); + DataRelayer relayer(policy, inputs, index, services.ref(), -1); relayer.setPipelineLength(4); // DataHeader matching the one provided in the input