Skip to content

Commit 349a25e

Browse files
committed
DPL: move away from MessageSet::header / payload
Abstract header / payload retrieval, with the idea that get_header / get_payload will work on any range of fair::mq::MessagePtrs. For now we only do the first header / payload pair only, to validate the trivial change.
1 parent 5768b11 commit 349a25e

File tree

4 files changed

+13
-13
lines changed

4 files changed

+13
-13
lines changed

Framework/Core/include/Framework/DataModelViews.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ struct get_header {
153153
// ends the pipeline, returns the number of parts
154154
template <typename R>
155155
requires std::ranges::random_access_range<R> && std::ranges::sized_range<R>
156-
friend fair::mq::MessagePtr& operator|(R&& r, get_header self)
156+
friend auto& operator|(R&& r, get_header self)
157157
{
158158
return r[(r | get_dataref_indices{self.id, 0}).headerIdx];
159159
}
@@ -165,7 +165,7 @@ struct get_payload {
165165
// ends the pipeline, returns the number of parts
166166
template <typename R>
167167
requires std::ranges::random_access_range<R> && std::ranges::sized_range<R>
168-
friend fair::mq::MessagePtr& operator|(R&& r, get_payload self)
168+
friend auto& operator|(R&& r, get_payload self)
169169
{
170170
return r[(r | get_dataref_indices{self.part, self.subPart}).payloadIdx];
171171
}

Framework/Core/src/DataProcessingDevice.cxx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2153,7 +2153,7 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
21532153
return currentSetOfInputs[i].getNumberOfPairs();
21542154
};
21552155
auto refCountGetter = [&currentSetOfInputs](size_t idx) -> int {
2156-
auto& header = static_cast<const fair::mq::shmem::Message&>(*currentSetOfInputs[idx].header(0));
2156+
auto& header = static_cast<const fair::mq::shmem::Message&>(*(currentSetOfInputs[idx].messages | get_header{0}));
21572157
return header.GetRefCount();
21582158
};
21592159
return InputSpan{getter, nofPartsGetter, refCountGetter, currentSetOfInputs.size()};

Framework/Core/src/DataRelayer.cxx

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -184,11 +184,11 @@ DataRelayer::ActivityStats DataRelayer::processDanglingInputs(std::vector<Expira
184184
// We check that no data is already there for the given cell
185185
// it is enough to check the first element
186186
auto& part = mCache[ti * mDistinctRoutesIndex.size() + expirator.routeIndex.value];
187-
if (!part.messages.empty() && part.header(0) != nullptr) {
187+
if (!part.messages.empty() && (part.messages | get_header{0}) != nullptr) {
188188
headerPresent++;
189189
continue;
190190
}
191-
if (!part.messages.empty() && part.payload(0) != nullptr) {
191+
if (!part.messages.empty() && (part.messages | get_payload{0, 0}) != nullptr) {
192192
payloadPresent++;
193193
continue;
194194
}
@@ -227,7 +227,7 @@ DataRelayer::ActivityStats DataRelayer::processDanglingInputs(std::vector<Expira
227227
return partial[idx].messages | count_parts{};
228228
};
229229
auto refCountGetter = [&partial](size_t idx) -> int {
230-
auto& header = static_cast<const fair::mq::shmem::Message&>(*partial[idx].header(0));
230+
auto& header = static_cast<const fair::mq::shmem::Message&>(*(partial[idx].messages | get_header{0}));
231231
return header.GetRefCount();
232232
};
233233
InputSpan span{getter, nPartsGetter, refCountGetter, static_cast<size_t>(partial.size())};
@@ -246,8 +246,8 @@ DataRelayer::ActivityStats DataRelayer::processDanglingInputs(std::vector<Expira
246246
activity.expiredSlots++;
247247

248248
mTimesliceIndex.markAsDirty(slot, true);
249-
assert(part.header(0) != nullptr);
250-
assert(part.payload(0) != nullptr);
249+
assert((part.messages | get_header{0}) != nullptr);
250+
assert((part.messages | get_payload{0, 0}) != nullptr);
251251
}
252252
}
253253
LOGP(debug, "DataRelayer::processDanglingInputs headerPresent:{}, payloadPresent:{}, noCheckers:{}, badSlot:{}, checkerDenied:{}",
@@ -800,7 +800,7 @@ void DataRelayer::getReadyToProcess(std::vector<DataRelayer::RecordAction>& comp
800800
return partial[idx].messages | count_parts{};
801801
};
802802
auto refCountGetter = [&partial](size_t idx) -> int {
803-
auto& header = static_cast<const fair::mq::shmem::Message&>(*partial[idx].header(0));
803+
auto& header = static_cast<const fair::mq::shmem::Message&>(*(partial[idx].messages | get_header{0}));
804804
return header.GetRefCount();
805805
};
806806
InputSpan span{getter, nPartsGetter, refCountGetter, static_cast<size_t>(partial.size())};

Framework/Core/test/test_DataRelayer.cxx

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -798,11 +798,11 @@ TEST_CASE("DataRelayer")
798798
// one message set containing number of added sequences of messages
799799
REQUIRE((messageSet[0].messages | count_parts{}) == sequenceSize.size());
800800
size_t counter = 0;
801-
for (auto seqid = 0; seqid < sequenceSize.size(); ++seqid) {
801+
for (size_t seqid = 0; seqid < sequenceSize.size(); ++seqid) {
802802
REQUIRE(messageSet[0].getNumberOfPayloads(seqid) == sequenceSize[seqid]);
803-
for (auto pi = 0; pi < messageSet[0].getNumberOfPayloads(seqid); ++pi) {
804-
REQUIRE(messageSet[0].payload(seqid, pi));
805-
auto const* data = messageSet[0].payload(seqid, pi)->GetData();
803+
for (size_t pi = 0; pi < messageSet[0].getNumberOfPayloads(seqid); ++pi) {
804+
REQUIRE((messageSet[0].messages | get_payload{seqid, pi}));
805+
auto const* data = (messageSet[0].messages | get_payload{seqid, pi})->GetData();
806806
REQUIRE(*(reinterpret_cast<size_t const*>(data)) == counter);
807807
++counter;
808808
}

0 commit comments

Comments
 (0)