@@ -228,102 +228,128 @@ TransitionHandlingState DataProcessingHelpers::updateStateTransition(ServiceRegi
228228 }
229229}
230230
231- auto DataProcessingHelpers::routeForwardedMessages (FairMQDeviceProxy& proxy,
232- std::vector<MessageSet>& currentSetOfInputs,
233- const bool copyByDefault, bool consume) -> std::vector<fair::mq::Parts>
231+ void DataProcessingHelpers::routeForwardedMessages (FairMQDeviceProxy& proxy, std::span<fair::mq::MessagePtr>& messages, std::vector<fair::mq::Parts>& forwardedParts,
232+ const bool copyByDefault, bool consume)
234233{
235- // we collect all messages per forward in a map and send them together
236- std::vector<fair::mq::Parts> forwardedParts;
237- forwardedParts.resize (proxy.getNumForwards ());
238- std::vector<ChannelIndex> forwardingChoices{};
239234 O2_SIGNPOST_ID_GENERATE (sid, forwarding);
235+ std::vector<ChannelIndex> forwardingChoices{};
236+ size_t pi = 0 ;
237+ while (pi < messages.size ()) {
238+ auto & header = messages[pi];
240239
241- for (size_t ii = 0 , ie = currentSetOfInputs.size (); ii < ie; ++ii) {
242- auto & messageSet = currentSetOfInputs[ii];
240+ // If is now possible that the record is not complete when
241+ // we forward it, because of a custom completion policy.
242+ // this means that we need to skip the empty entries in the
243+ // record for being forwarded.
244+ if (header->GetData () == nullptr ) {
245+ pi += 2 ;
246+ continue ;
247+ }
248+ auto dih = o2::header::get<DomainInfoHeader*>(header->GetData ());
249+ if (dih) {
250+ pi += 2 ;
251+ continue ;
252+ }
253+ auto sih = o2::header::get<SourceInfoHeader*>(header->GetData ());
254+ if (sih) {
255+ pi += 2 ;
256+ continue ;
257+ }
243258
244- for ( size_t pi = 0 ; pi < messageSet. size (); ++pi) {
245- auto & header = messageSet. header (pi );
259+ auto dph = o2::header::get<DataProcessingHeader*>(header-> GetData ());
260+ auto dh = o2:: header::get<o2::header::DataHeader*>(header-> GetData () );
246261
247- // If is now possible that the record is not complete when
248- // we forward it, because of a custom completion policy.
249- // this means that we need to skip the empty entries in the
250- // record for being forwarded.
251- if (header->GetData () == nullptr ) {
252- continue ;
253- }
254- auto dih = o2::header::get<DomainInfoHeader*>(header->GetData ());
255- if (dih) {
256- continue ;
257- }
258- auto sih = o2::header::get<SourceInfoHeader*>(header->GetData ());
259- if (sih) {
260- continue ;
261- }
262+ if (dph == nullptr || dh == nullptr ) {
263+ // Complain only if this is not an out-of-band message
264+ LOGP (error, " Data is missing {}{}{}" ,
265+ dph ? " DataProcessingHeader" : " " , dph || dh ? " and" : " " , dh ? " DataHeader" : " " );
266+ pi += 2 ;
267+ continue ;
268+ }
262269
263- auto dph = o2::header::get<DataProcessingHeader*>(header->GetData ());
264- auto dh = o2::header::get<o2::header::DataHeader*>(header->GetData ());
270+ // At least one payload.
271+ auto & payload = messages[pi + 1 ];
272+ // Calculate the number of messages which should be handled together
273+ // all in one go.
274+ size_t numberOfMessages = 0 ;
275+ if (dh->splitPayloadParts > 0 && dh->splitPayloadParts == dh->splitPayloadIndex ) {
276+ // Sequence of (header, payload[0], ... , payload[splitPayloadParts - 1]) pairs belonging together.
277+ numberOfMessages = dh->splitPayloadParts + 1 ; // one is for the header
278+ } else {
279+ // Sequence of splitPayloadParts (header, payload) pairs belonging together.
280+ // In case splitPayloadParts = 0, we consider this as a single message pair
281+ numberOfMessages = (dh->splitPayloadParts > 0 ? dh->splitPayloadParts : 1 ) * 2 ;
282+ }
265283
266- if (dph == nullptr || dh == nullptr ) {
267- // Complain only if this is not an out-of-band message
268- LOGP (error, " Data is missing {}{}{}" ,
269- dph ? " DataProcessingHeader" : " " , dph || dh ? " and" : " " , dh ? " DataHeader" : " " );
270- continue ;
271- }
284+ if (payload.get () == nullptr && consume == true ) {
285+ // If the payload is not there, it means we already
286+ // processed it with ConsumeExisiting. Therefore we
287+ // need to do something only if this is the last consume.
288+ header.reset (nullptr );
289+ pi += numberOfMessages;
290+ continue ;
291+ }
272292
273- auto & payload = messageSet.payload (pi);
293+ // We need to find the forward route only for the first
294+ // part of a split payload. All the others will use the same.
295+ // Therefore, we reset and recompute the forwarding choice:
296+ //
297+ // - If this is the first payload of a [header0][payload0][header0][payload1]... sequence,
298+ // which is actually always created and handled together. Notice that in this
299+ // case we have splitPayloadParts == splitPayloadIndex
300+ // - If this is the first payload of a [header0][payload0][header1][payload1]... sequence
301+ // belonging to the same multipart message (and therefore we are guaranteed that they
302+ // need to be routed together).
303+ // - If the message is not a multipart (splitPayloadParts 0) or has only one part
304+ // - If it's a message of the kind [header0][payload1][payload2][payload3]... and therefore
305+ // we will already use the same choice in the for loop below.
306+ //
274307
275- if (payload.get () == nullptr && consume == true ) {
276- // If the payload is not there, it means we already
277- // processed it with ConsumeExisiting. Therefore we
278- // need to do something only if this is the last consume.
279- header.reset (nullptr );
280- continue ;
281- }
308+ forwardingChoices.clear ();
309+ proxy.getMatchingForwardChannelIndexes (forwardingChoices, *dh, dph->startTime );
282310
283- // We need to find the forward route only for the first
284- // part of a split payload. All the others will use the same.
285- // Therefore, we reset and recompute the forwarding choice:
286- //
287- // - If this is the first payload of a [header0][payload0][header0][payload1] sequence,
288- // which is actually always created and handled together
289- // - If the message is not a multipart (splitPayloadParts 0) or has only one part
290- // - If it's a message of the kind [header0][payload1][payload2][payload3]... and therefore
291- // we will already use the same choice in the for loop below.
292- if (dh->splitPayloadIndex == 0 || dh->splitPayloadParts <= 1 || messageSet.getNumberOfPayloads (pi) > 0 ) {
293- forwardingChoices.clear ();
294- proxy.getMatchingForwardChannelIndexes (forwardingChoices, *dh, dph->startTime );
295- }
311+ if (forwardingChoices.empty ()) {
312+ // Nothing to forward go to the next messageset
313+ pi += numberOfMessages;
314+ continue ;
315+ }
296316
297- if (forwardingChoices.empty ()) {
298- // Nothing to forward go to the next messageset
299- continue ;
300- }
317+ // In case of more than one forward route, we need to copy the message.
318+ // This will eventually use the same memory if running with the same backend.
319+ if (copyByDefault || forwardingChoices.size () > 1 ) {
320+ for (auto & choice : forwardingChoices) {
321+ O2_SIGNPOST_EVENT_EMIT (forwarding, sid, " forwardInputs" , " Forwarding a copy of %{public}s to route %d." ,
322+ fmt::format (" {}/{}/{}@timeslice:{} tfCounter:{}" , dh->dataOrigin , dh->dataDescription , dh->subSpecification , dph->startTime , dh->tfCounter ).c_str (), choice.value );
301323
302- // In case of more than one forward route, we need to copy the message.
303- // This will eventually use the same memory if running with the same backend.
304- if (copyByDefault || forwardingChoices.size () > 1 ) {
305- for (auto & choice : forwardingChoices) {
306- auto && newHeader = header->GetTransport ()->CreateMessage ();
307- O2_SIGNPOST_EVENT_EMIT (forwarding, sid, " forwardInputs" , " Forwarding a copy of %{public}s to route %d." ,
308- fmt::format (" {}/{}/{}@timeslice:{} tfCounter:{}" , dh->dataOrigin , dh->dataDescription , dh->subSpecification , dph->startTime , dh->tfCounter ).c_str (), choice.value );
309- newHeader->Copy (*header);
310- forwardedParts[choice.value ].AddPart (std::move (newHeader));
311-
312- for (size_t payloadIndex = 0 ; payloadIndex < messageSet.getNumberOfPayloads (pi); ++payloadIndex) {
313- auto && newPayload = header->GetTransport ()->CreateMessage ();
314- newPayload->Copy (*messageSet.payload (pi, payloadIndex));
315- forwardedParts[choice.value ].AddPart (std::move (newPayload));
316- }
317- }
318- } else {
319- O2_SIGNPOST_EVENT_EMIT (forwarding, sid, " forwardInputs" , " Forwarding %{public}s to route %d." ,
320- fmt::format (" {}/{}/{}@timeslice:{} tfCounter:{}" , dh->dataOrigin , dh->dataDescription , dh->subSpecification , dph->startTime , dh->tfCounter ).c_str (), forwardingChoices.back ().value );
321- forwardedParts[forwardingChoices.back ().value ].AddPart (std::move (messageSet.header (pi)));
322- for (size_t payloadIndex = 0 ; payloadIndex < messageSet.getNumberOfPayloads (pi); ++payloadIndex) {
323- forwardedParts[forwardingChoices.back ().value ].AddPart (std::move (messageSet.payload (pi, payloadIndex)));
324+ for (size_t ppi = pi; ppi < pi + numberOfMessages; ++ppi) {
325+ auto && newMsg = header->GetTransport ()->CreateMessage ();
326+ newMsg->Copy (*messages[ppi]);
327+ forwardedParts[choice.value ].AddPart (std::move (newMsg));
324328 }
325329 }
330+ } else {
331+ O2_SIGNPOST_EVENT_EMIT (forwarding, sid, " forwardInputs" , " Forwarding %{public}s to route %d." ,
332+ fmt::format (" {}/{}/{}@timeslice:{} tfCounter:{}" , dh->dataOrigin , dh->dataDescription , dh->subSpecification , dph->startTime , dh->tfCounter ).c_str (), forwardingChoices.back ().value );
333+ for (size_t ppi = pi; ppi < pi + numberOfMessages; ++ppi) {
334+ forwardedParts[forwardingChoices.back ().value ].AddPart (std::move (messages[ppi]));
335+ }
326336 }
337+ pi += numberOfMessages;
338+ }
339+ }
340+
341+ auto DataProcessingHelpers::routeForwardedMessageSet (FairMQDeviceProxy& proxy,
342+ std::vector<MessageSet>& currentSetOfInputs,
343+ const bool copyByDefault, bool consume) -> std::vector<fair::mq::Parts>
344+ {
345+ // we collect all messages per forward in a map and send them together
346+ std::vector<fair::mq::Parts> forwardedParts;
347+ forwardedParts.resize (proxy.getNumForwards ());
348+ std::vector<ChannelIndex> forwardingChoices{};
349+
350+ for (size_t ii = 0 , ie = currentSetOfInputs.size (); ii < ie; ++ii) {
351+ auto span = std::span<fair::mq::MessagePtr>(currentSetOfInputs[ii].messages );
352+ routeForwardedMessages (proxy, span, forwardedParts, copyByDefault, consume);
327353 }
328354 return forwardedParts;
329355};
0 commit comments