Skip to content

Commit c366949

Browse files
committed
DPL Analysis: use Input/OutputSpec metadata to detect special tables instead of origins
1 parent 78b931b commit c366949

File tree

8 files changed

+194
-83
lines changed

8 files changed

+194
-83
lines changed

Framework/Core/include/Framework/AnalysisHelpers.h

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -358,14 +358,14 @@ template <TableRef R>
358358
constexpr auto tableRef2InputSpec()
359359
{
360360
std::vector<framework::ConfigParamSpec> metadata;
361-
auto m = getInputMetadata<typename o2::aod::MetadataTrait<o2::aod::Hash<R.desc_hash>>::metadata>();
362-
metadata.insert(metadata.end(), m.begin(), m.end());
363-
auto ccdbMetadata = getCCDBMetadata<typename o2::aod::MetadataTrait<o2::aod::Hash<R.desc_hash>>::metadata>();
364-
metadata.insert(metadata.end(), ccdbMetadata.begin(), ccdbMetadata.end());
365-
auto p = getExpressionMetadata<typename o2::aod::MetadataTrait<o2::aod::Hash<R.desc_hash>>::metadata>();
366-
metadata.insert(metadata.end(), p.begin(), p.end());
367-
auto idx = getIndexMetadata<typename o2::aod::MetadataTrait<o2::aod::Hash<R.desc_hash>>::metadata>();
368-
metadata.insert(metadata.end(), idx.begin(), idx.end());
361+
auto sources = getInputMetadata<typename o2::aod::MetadataTrait<o2::aod::Hash<R.desc_hash>>::metadata>();
362+
metadata.insert(metadata.end(), sources.begin(), sources.end());
363+
auto ccdbURLs = getCCDBMetadata<typename o2::aod::MetadataTrait<o2::aod::Hash<R.desc_hash>>::metadata>();
364+
metadata.insert(metadata.end(), ccdbURLs.begin(), ccdbURLs.end());
365+
auto expressions = getExpressionMetadata<typename o2::aod::MetadataTrait<o2::aod::Hash<R.desc_hash>>::metadata>();
366+
metadata.insert(metadata.end(), expressions.begin(), expressions.end());
367+
auto indices = getIndexMetadata<typename o2::aod::MetadataTrait<o2::aod::Hash<R.desc_hash>>::metadata>();
368+
metadata.insert(metadata.end(), indices.begin(), indices.end());
369369
if constexpr (!soa::with_ccdb_urls<typename o2::aod::MetadataTrait<o2::aod::Hash<R.desc_hash>>::metadata>) {
370370
metadata.emplace_back(framework::ConfigParamSpec{"schema", framework::VariantType::String, framework::serializeSchema(o2::aod::MetadataTrait<o2::aod::Hash<R.desc_hash>>::metadata::getSchema()), {"\"\""}});
371371
}
@@ -382,11 +382,22 @@ constexpr auto tableRef2InputSpec()
382382
template <TableRef R>
383383
constexpr auto tableRef2OutputSpec()
384384
{
385+
std::vector<framework::ConfigParamSpec> metadata;
386+
using md = typename o2::aod::MetadataTrait<o2::aod::Hash<R.desc_hash>>::metadata;
387+
if constexpr (soa::with_ccdb_urls<md>) {
388+
metadata.emplace_back("ccdb:", framework::VariantType::Bool, true, framework::ConfigParamSpec::HelpString{"\"\""});
389+
} else if constexpr(soa::with_expression_pack<md>) {
390+
metadata.emplace_back("projectors", framework::VariantType::Bool, true, framework::ConfigParamSpec::HelpString{"\"\""});
391+
} else if constexpr (soa::with_index_pack<md>) {
392+
metadata.emplace_back("index-records", framework::VariantType::Bool, true, framework::ConfigParamSpec::HelpString{"\"\""});
393+
}
385394
return framework::OutputSpec{
386395
framework::OutputLabel{o2::aod::label<R>()},
387396
o2::aod::origin<R>(),
388397
o2::aod::description(o2::aod::signature<R>()),
389-
R.version};
398+
R.version,
399+
framework::Lifetime::Timeframe,
400+
metadata};
390401
}
391402

392403
template <TableRef R>
@@ -504,14 +515,14 @@ struct OutputForTable {
504515
using table_t = decltype(typeWithRef<T>());
505516
using metadata = aod::MetadataTrait<o2::aod::Hash<table_t::ref.desc_hash>>::metadata;
506517

507-
static OutputSpec const spec()
518+
static constexpr auto spec()
508519
{
509-
return OutputSpec{OutputLabel{aod::label<table_t::ref>()}, o2::aod::origin<table_t::ref>(), o2::aod::description(o2::aod::signature<table_t::ref>()), table_t::ref.version};
520+
return soa::tableRef2OutputSpec<table_t::ref>();
510521
}
511522

512-
static OutputRef ref()
523+
static constexpr auto ref()
513524
{
514-
return OutputRef{aod::label<table_t::ref>(), table_t::ref.version};
525+
return soa::tableRef2OutputRef<table_t::ref>();
515526
}
516527
};
517528

Framework/Core/include/Framework/AnalysisManagers.h

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -183,20 +183,20 @@ bool newDataframeCondition(InputRecord& record, C& conditionGroup)
183183

184184
/// Outputs handling
185185
template <typename T>
186-
bool appendOutput(std::vector<OutputSpec>&, T&, uint32_t)
186+
constexpr bool appendOutput(std::vector<OutputSpec>&, T&, uint32_t)
187187
{
188188
return false;
189189
}
190190

191191
template <is_produces T>
192-
bool appendOutput(std::vector<OutputSpec>& outputs, T&, uint32_t)
192+
constexpr bool appendOutput(std::vector<OutputSpec>& outputs, T&, uint32_t)
193193
{
194-
outputs.emplace_back(OutputForTable<typename T::persistent_table_t>::spec());
194+
outputs.emplace_back(soa::tableRef2OutputSpec<T::persistent_table_t::ref>());
195195
return true;
196196
}
197197

198198
template <is_produces_group T>
199-
bool appendOutput(std::vector<OutputSpec>& outputs, T& producesGroup, uint32_t hash)
199+
constexpr bool appendOutput(std::vector<OutputSpec>& outputs, T& producesGroup, uint32_t hash)
200200
{
201201
homogeneous_apply_refs<true>([&outputs, hash](auto& produces) { return appendOutput(outputs, produces, hash); }, producesGroup);
202202
return true;
@@ -261,7 +261,7 @@ bool prepareOutput(ProcessingContext&, T&)
261261
template <is_produces T>
262262
bool prepareOutput(ProcessingContext& context, T& produces)
263263
{
264-
produces.resetCursor(std::move(context.outputs().make<TableBuilder>(OutputForTable<typename T::persistent_table_t>::ref())));
264+
produces.resetCursor(std::move(context.outputs().make<TableBuilder>(soa::tableRef2OutputRef<T::persistent_table_t::ref>())));
265265
return true;
266266
}
267267

Framework/Core/include/Framework/DanglingEdgesContext.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,24 @@ struct OutputObjectInfo {
3333
// been requested and for which we will need to inject
3434
// some source device.
3535
struct DanglingEdgesContext {
36+
// generic AOD tables
3637
std::vector<InputSpec> requestedAODs;
3738
std::vector<OutputSpec> providedAODs;
39+
// extension tables
3840
std::vector<InputSpec> requestedDYNs;
3941
std::vector<OutputSpec> providedDYNs;
42+
// index tables
4043
std::vector<InputSpec> requestedIDXs;
44+
std::vector<OutputSpec> providedIDXs;
45+
// ccdb tables
4146
std::vector<OutputSpec> providedTIMs;
4247
std::vector<InputSpec> requestedTIMs;
48+
// output objects
4349
std::vector<OutputSpec> providedOutputObjHist;
50+
// inputs for the extension spawner
4451
std::vector<InputSpec> spawnerInputs;
52+
// inputs for the index builder
53+
std::vector<InputSpec> builderInputs;
4554

4655
// These are the timestamped tables which are required to
4756
// inject the the CCDB objecs.

Framework/Core/include/Framework/DataSpecViews.h

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,31 @@
1414
#include "Framework/DataSpecUtils.h"
1515
#include <ranges>
1616

17+
namespace o2::framework::checks
18+
{
19+
static auto has_params_with_name(std::string&& name)
20+
{
21+
return [name](ConfigParamSpec const& p){return p.name.compare(name) == 0;};
22+
}
23+
24+
static auto has_params_with_name_starting(std::string&& name)
25+
{
26+
return [name](ConfigParamSpec const& p){return p.name.starts_with(name);};
27+
}
28+
}
29+
1730
namespace o2::framework::views
1831
{
32+
static auto filter_with_params_by_name(std::string&& name)
33+
{
34+
return std::views::filter([name = std::move(name)](auto const& spec) mutable { return std::ranges::any_of(spec.metadata, checks::has_params_with_name(std::move(name))); });
35+
}
36+
37+
static auto filter_with_params_by_name_starting(std::string&& name)
38+
{
39+
return std::views::filter([name = std::move(name)](auto const& spec) mutable { return std::ranges::any_of(spec.metadata, checks::has_params_with_name_starting(std::move(name))); });
40+
}
41+
1942
static auto partial_match_filter(auto what)
2043
{
2144
return std::views::filter([what](auto const& t) -> bool { return DataSpecUtils::partialMatch(t, what); });

Framework/Core/src/AnalysisSupportHelpers.cxx

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,9 +175,14 @@ void AnalysisSupportHelpers::addMissingOutputsToBuilder(std::vector<InputSpec> c
175175
// FIXME: until we have a single list of pairs
176176
additionalInputs |
177177
views::partial_match_filter(AODOrigins) |
178+
std::ranges::views::filter([](InputSpec const& input){
179+
return std::ranges::none_of(input.metadata, [](ConfigParamSpec const& p){ return (p.name.compare("projectors") == 0) || (p.name.compare("index-records") == 0); });
180+
}) |
178181
sinks::update_input_list{requestedAODs}; // update requestedAODs
179182
additionalInputs |
180-
views::partial_match_filter(header::DataOrigin{"DYN"}) |
183+
std::ranges::views::filter([](InputSpec const& input){
184+
return std::ranges::any_of(input.metadata, [](ConfigParamSpec const& p){ return p.name.compare("projectors") == 0; });
185+
}) |
181186
sinks::update_input_list{requestedDYNs}; // update requestedDYNs
182187
}
183188

Framework/Core/src/ArrowSupport.cxx

Lines changed: 51 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ O2_DECLARE_DYNAMIC_LOG(rate_limiting);
5050

5151
namespace o2::framework
5252
{
53-
5453
class EndOfStreamContext;
5554
class ProcessingContext;
5655

@@ -578,45 +577,80 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
578577
} },
579578
.adjustTopology = [](WorkflowSpecNode& node, ConfigContext const& ctx) {
580579
auto& workflow = node.specs;
581-
auto spawner = std::ranges::find_if(workflow, [](DataProcessorSpec const& spec) { return spec.name.starts_with("internal-dpl-aod-spawner"); });
582-
auto analysisCCDB = std::ranges::find_if(workflow, [](DataProcessorSpec const& spec) { return spec.name.starts_with("internal-dpl-aod-ccdb"); });
583-
auto builder = std::ranges::find_if(workflow, [](DataProcessorSpec const& spec) { return spec.name.starts_with("internal-dpl-aod-index-builder"); });
584-
auto writer = std::ranges::find_if(workflow, [](DataProcessorSpec const& spec) { return spec.name.starts_with("internal-dpl-aod-writer"); });
585580
auto& dec = ctx.services().get<DanglingEdgesContext>();
586581
dec.requestedAODs.clear();
587582
dec.requestedDYNs.clear();
588-
dec.providedDYNs.clear();
589-
dec.providedTIMs.clear();
590-
dec.requestedTIMs.clear();
591583

592584
auto inputSpecLessThan = [](InputSpec const& lhs, InputSpec const& rhs) { return DataSpecUtils::describe(lhs) < DataSpecUtils::describe(rhs); };
593585
auto outputSpecLessThan = [](OutputSpec const& lhs, OutputSpec const& rhs) { return DataSpecUtils::describe(lhs) < DataSpecUtils::describe(rhs); };
594586

587+
auto builder = std::ranges::find_if(workflow, [](DataProcessorSpec const& spec) { return spec.name.starts_with("internal-dpl-aod-index-builder"); });
595588
if (builder != workflow.end()) {
596589
// collect currently requested IDXs
597590
dec.requestedIDXs.clear();
591+
dec.providedIDXs.clear();
598592
for (auto& d : workflow | views::exclude_by_name(builder->name)) {
599593
d.inputs |
600-
views::partial_match_filter(header::DataOrigin{"IDX"}) |
594+
views::filter_with_params_by_name("index-records") |
601595
sinks::update_input_list{dec.requestedIDXs};
596+
d.outputs |
597+
views::filter_with_params_by_name("index-records") |
598+
sinks::update_output_list{dec.providedIDXs};
602599
}
600+
std::ranges::sort(dec.requestedIDXs, inputSpecLessThan);
601+
std::ranges::sort(dec.providedIDXs, outputSpecLessThan);
602+
dec.builderInputs.clear();
603+
dec.requestedIDXs |
604+
views::filter_not_matching(dec.providedIDXs) |
605+
sinks::append_to{dec.builderInputs};
603606
// recreate inputs and outputs
604607
builder->inputs.clear();
605608
builder->outputs.clear();
609+
AnalysisSupportHelpers::addMissingOutputsToBuilder(dec.builderInputs, dec.requestedAODs, dec.requestedDYNs, *builder);
610+
if (!builder->inputs.empty()) {
611+
// load real AlgorithmSpec before deployment
612+
builder->algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkOnDemandTablesSupport", "IndexTableBuilder", ctx);
613+
}
614+
}
615+
616+
auto analysisCCDB = std::ranges::find_if(workflow, [](DataProcessorSpec const& spec) { return spec.name.starts_with("internal-dpl-aod-ccdb"); });
617+
if (analysisCCDB != workflow.end()) {
618+
dec.requestedTIMs.clear();
619+
dec.providedTIMs.clear();
620+
for (auto& d : workflow | views::exclude_by_name(analysisCCDB->name)) {
621+
d.inputs |
622+
views::filter_with_params_by_name_starting("ccdb:") |
623+
sinks::update_input_list{dec.requestedTIMs};
624+
d.outputs |
625+
views::filter_with_params_by_name_starting("ccdb:") |
626+
sinks::append_to{dec.providedTIMs};
627+
}
628+
std::ranges::sort(dec.requestedTIMs, inputSpecLessThan);
629+
std::ranges::sort(dec.providedTIMs, outputSpecLessThan);
630+
// Use ranges::to<std::vector<>> in C++23...
631+
dec.analysisCCDBInputs.clear();
632+
dec.requestedTIMs |
633+
views::filter_not_matching(dec.providedTIMs) |
634+
sinks::append_to{dec.analysisCCDBInputs};
606635

636+
// recreate inputs and outputs
637+
analysisCCDB->outputs.clear();
638+
analysisCCDB->inputs.clear();
639+
AnalysisSupportHelpers::addMissingOutputsToBuilder(dec.analysisCCDBInputs, dec.requestedAODs, dec.requestedDYNs, *analysisCCDB);
607640
// load real AlgorithmSpec before deployment
608-
builder->algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkOnDemandTablesSupport", "IndexTableBuilder", ctx);
609-
AnalysisSupportHelpers::addMissingOutputsToBuilder(dec.requestedIDXs, dec.requestedAODs, dec.requestedDYNs, *builder);
641+
analysisCCDB->algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkCCDBSupport", "AnalysisCCDBFetcherPlugin", ctx);
610642
}
611643

644+
auto spawner = std::ranges::find_if(workflow, [](DataProcessorSpec const& spec) { return spec.name.starts_with("internal-dpl-aod-spawner"); });
612645
if (spawner != workflow.end()) {
646+
dec.providedDYNs.clear();
613647
// collect currently requested DYNs
614648
for (auto& d : workflow | views::exclude_by_name(spawner->name)) {
615649
d.inputs |
616-
views::partial_match_filter(header::DataOrigin{"DYN"}) |
650+
views::filter_with_params_by_name("projectors") |
617651
sinks::update_input_list{dec.requestedDYNs};
618652
d.outputs |
619-
views::partial_match_filter(header::DataOrigin{"DYN"}) |
653+
views::filter_with_params_by_name("projectors") |
620654
sinks::append_to{dec.providedDYNs};
621655
}
622656
std::ranges::sort(dec.requestedDYNs, inputSpecLessThan);
@@ -628,32 +662,14 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
628662
// recreate inputs and outputs
629663
spawner->outputs.clear();
630664
spawner->inputs.clear();
631-
632-
// load real AlgorithmSpec before deployment
633-
spawner->algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkOnDemandTablesSupport", "ExtendedTableSpawner", ctx);
634665
AnalysisSupportHelpers::addMissingOutputsToSpawner({}, dec.spawnerInputs, dec.requestedAODs, *spawner);
635-
}
636-
637-
if (analysisCCDB != workflow.end()) {
638-
for (auto& d : workflow | views::exclude_by_name(analysisCCDB->name)) {
639-
d.inputs | views::partial_match_filter(header::DataOrigin{"ATIM"}) | sinks::update_input_list{dec.requestedTIMs};
640-
d.outputs | views::partial_match_filter(header::DataOrigin{"ATIM"}) | sinks::append_to{dec.providedTIMs};
666+
if (!spawner->inputs.empty()) {
667+
// load real AlgorithmSpec before deployment
668+
spawner->algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkOnDemandTablesSupport", "ExtendedTableSpawner", ctx);
641669
}
642-
std::ranges::sort(dec.requestedTIMs, inputSpecLessThan);
643-
std::ranges::sort(dec.providedTIMs, outputSpecLessThan);
644-
// Use ranges::to<std::vector<>> in C++23...
645-
dec.analysisCCDBInputs.clear();
646-
dec.requestedTIMs | views::filter_not_matching(dec.providedTIMs) | sinks::append_to{dec.analysisCCDBInputs};
647-
648-
// recreate inputs and outputs
649-
analysisCCDB->outputs.clear();
650-
analysisCCDB->inputs.clear();
651-
// load real AlgorithmSpec before deployment
652-
// FIXME how can I make the lookup depend on DYN tables as well??
653-
analysisCCDB->algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkCCDBSupport", "AnalysisCCDBFetcherPlugin", ctx);
654-
AnalysisSupportHelpers::addMissingOutputsToBuilder(dec.analysisCCDBInputs, dec.requestedAODs, dec.requestedDYNs, *analysisCCDB);
655670
}
656671

672+
auto writer = std::ranges::find_if(workflow, [](DataProcessorSpec const& spec) { return spec.name.starts_with("internal-dpl-aod-writer"); });
657673
if (writer != workflow.end()) {
658674
workflow.erase(writer);
659675
}

0 commit comments

Comments
 (0)