diff --git a/Framework/Core/src/DataRelayer.cxx b/Framework/Core/src/DataRelayer.cxx index 7eb851e2aadd8..4cda75ed001b0 100644 --- a/Framework/Core/src/DataRelayer.cxx +++ b/Framework/Core/src/DataRelayer.cxx @@ -213,9 +213,9 @@ DataRelayer::ActivityStats DataRelayer::processDanglingInputs(std::vector(header->GetData()), reinterpret_cast(payload ? payload->GetData() : nullptr), @@ -786,9 +786,9 @@ void DataRelayer::getReadyToProcess(std::vector& comp auto partial = getPartialRecord(li); // TODO: get the data ref from message model auto getter = [&partial](size_t idx, size_t part) { - if (!partial[idx].messages.empty() && partial[idx].header(part).get()) { - auto header = partial[idx].header(part).get(); - auto payload = partial[idx].payload(part).get(); + if (!partial[idx].messages.empty() && (partial[idx].messages | get_header{part}).get()) { + auto header = (partial[idx].messages | get_header{part}).get(); + auto payload = (partial[idx].messages | get_payload{part, 0}).get(); return DataRef{nullptr, reinterpret_cast(header->GetData()), reinterpret_cast(payload ? payload->GetData() : nullptr), @@ -952,10 +952,10 @@ std::vector DataRelayer::consumeExistingInputsForTime // TODO: in the original implementation of the cache, there have been only two messages per entry, // check if the 2 above corresponds to the number of messages. for (size_t pi = 0; pi < (cache[cacheId].messages | count_parts{}); pi++) { - auto& header = cache[cacheId].header(pi); + auto& header = cache[cacheId].messages | get_header{pi}; auto&& newHeader = header->GetTransport()->CreateMessage(); newHeader->Copy(*header); - messages[arg].add(PartRef{std::move(newHeader), std::move(cache[cacheId].payload(pi))}); + messages[arg].add(PartRef{std::move(newHeader), std::move(cache[cacheId].messages | get_payload{pi, 0})}); } }; diff --git a/Framework/Core/test/test_MessageSet.cxx b/Framework/Core/test/test_MessageSet.cxx index 290e55220d6cb..aa7b49c1d1d3c 100644 --- a/Framework/Core/test/test_MessageSet.cxx +++ b/Framework/Core/test/test_MessageSet.cxx @@ -250,6 +250,14 @@ TEST_CASE("GetHeaderPayloadOperators") auto& pl1 = msgSet.messages | get_payload{1, 0}; REQUIRE(pl1.get() != nullptr); REQUIRE(pl1->GetSize() == 200); + + // Validate pipe operators match old API + for (size_t i = 0; i < 2; ++i) { + REQUIRE(&(msgSet.messages | get_header{i}) == &msgSet.header(i)); + REQUIRE(&(msgSet.messages | get_payload{i, 0}) == &msgSet.payload(i, 0)); + } + REQUIRE((msgSet.messages | count_parts{}) == msgSet.messageMap.size()); + REQUIRE((msgSet.messages | count_payloads{}) == msgSet.pairMap.size()); } TEST_CASE("GetHeaderPayloadMultiPayload")