Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion tree/ntuple/inc/ROOT/RField/RFieldFundamental.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -375,8 +375,11 @@ protected:
{
const auto r = Base::GetColumnRepresentatives();
const auto n = r.size();
// Note that we don't assume that we have 0 columns here, as this function may be called to extend the
// column representations of this field.
const auto first = fAvailableColumns.size();
fAvailableColumns.reserve(n);
for (std::uint16_t i = 0; i < n; ++i) {
for (auto i = first; i < n; ++i) {
auto &column = fAvailableColumns.emplace_back(ROOT::Internal::RColumn::Create<T>(r[i][0], 0, i));
if (r[i][0] == ROOT::ENTupleColumnType::kReal32Trunc) {
column->SetBitsOnStorage(fBitWidth);
Expand Down
17 changes: 15 additions & 2 deletions tree/ntuple/inc/ROOT/RFieldBase.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ void CallFlushColumnsOnField(RFieldBase &);
void CallCommitClusterOnField(RFieldBase &);
void CallConnectPageSinkOnField(RFieldBase &, ROOT::Internal::RPageSink &, ROOT::NTupleSize_t firstEntry = 0);
void CallConnectPageSourceOnField(RFieldBase &, ROOT::Internal::RPageSource &);
void CallConnectExtendedColumnsToPageSinkOnField(RFieldBase &, ROOT::Internal::RPageSink &,
ROOT::NTupleSize_t firstEntry);
ROOT::RResult<std::unique_ptr<ROOT::RFieldBase>>
CallFieldBaseCreate(const std::string &fieldName, const std::string &typeName, const ROOT::RCreateFieldOptions &options,
const ROOT::RNTupleDescriptor *desc, ROOT::DescriptorId_t fieldId);
Expand Down Expand Up @@ -86,6 +88,8 @@ class RFieldBase {
friend void Internal::CallCommitClusterOnField(RFieldBase &);
friend void Internal::CallConnectPageSinkOnField(RFieldBase &, ROOT::Internal::RPageSink &, ROOT::NTupleSize_t);
friend void Internal::CallConnectPageSourceOnField(RFieldBase &, ROOT::Internal::RPageSource &);
friend void
Internal::CallConnectExtendedColumnsToPageSinkOnField(RFieldBase &, ROOT::Internal::RPageSink &, ROOT::NTupleSize_t);
friend ROOT::RResult<std::unique_ptr<ROOT::RFieldBase>>
Internal::CallFieldBaseCreate(const std::string &fieldName, const std::string &typeName,
const ROOT::RCreateFieldOptions &options, const ROOT::RNTupleDescriptor *desc,
Expand Down Expand Up @@ -261,6 +265,9 @@ private:
/// calling this function. For subfields, a field ID may or may not be set. If the field ID is unset, it will be
/// determined using the page source descriptor, based on the parent field ID and the subfield name.
void ConnectPageSource(ROOT::Internal::RPageSource &pageSource);
/// Similar to ConnectPageSink, but only used to connect new columns that were added via late model extension.
/// The field must be already connected to the sink.
void ConnectExtendedColumnsToPageSink(ROOT::Internal::RPageSink &pageSink, ROOT::NTupleSize_t firstEntry = 0);

void SetArtificial()
{
Expand Down Expand Up @@ -357,8 +364,13 @@ protected:
GenerateColumnsImpl<0, ColumnCppTs...>(GetColumnRepresentations().GetSerializationDefault(), 0);
} else {
const auto N = fColumnRepresentatives.size();
fAvailableColumns.reserve(N * sizeof...(ColumnCppTs));
for (unsigned i = 0; i < N; ++i) {
constexpr auto cardinality = sizeof...(ColumnCppTs);
static_assert(cardinality > 0, "GenerateColumnsImpl must be called with at least 1 type argument");
fAvailableColumns.reserve(N * cardinality);
// Note that we don't assume that we have 0 columns here, as this function may be called to extend the
// column representations of this field.
const auto first = fAvailableColumns.size();
for (auto i = first / cardinality; i < N; ++i) {
GenerateColumnsImpl<0, ColumnCppTs...>(fColumnRepresentatives[i].get(), i);
}
}
Expand Down Expand Up @@ -958,6 +970,7 @@ struct RFieldRepresentationModifier {
const auto N = field.fColumnRepresentatives[0].get().size();
R__ASSERT(N >= 1 && N <= 2);
R__ASSERT(field.fPrincipalColumn);
R__ASSERT(newRepresentationIdx * N < field.fAvailableColumns.size());
field.fPrincipalColumn = field.fAvailableColumns[newRepresentationIdx * N].get();
if (field.fAuxiliaryColumn) {
R__ASSERT(N == 2);
Expand Down
6 changes: 6 additions & 0 deletions tree/ntuple/inc/ROOT/RNTupleFillContext.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@

namespace ROOT {

namespace Internal {
// Used for testing
RPageSink &GetWriterSink(ROOT::RNTupleWriter &writer);
} // namespace Internal

// clang-format off
/**
\class ROOT::RNTupleFillContext
Expand All @@ -49,6 +54,7 @@ sequential writing, please refer to RNTupleWriter.
class RNTupleFillContext {
friend class ROOT::RNTupleWriter;
friend class RNTupleParallelWriter;
friend Internal::RPageSink &Internal::GetWriterSink(RNTupleWriter &);

private:
/// The page sink's parallel page compression scheduler if IMT is on.
Expand Down
10 changes: 10 additions & 0 deletions tree/ntuple/inc/ROOT/RNTupleModel.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -402,11 +402,18 @@ You will not normally use this directly; see `RNTupleModel::RUpdater` instead.
*/
// clang-format on
struct RNTupleModelChangeset {
struct RAddedColumnRepr {
ROOT::RFieldBase *fField = nullptr;
std::uint16_t fNumNewColumns = 0;
};

RNTupleModel &fModel;
/// Points to the fields in fModel that were added as part of an updater transaction
std::vector<ROOT::RFieldBase *> fAddedFields;
/// Points to the projected fields in fModel that were added as part of an updater transaction
std::vector<ROOT::RFieldBase *> fAddedProjectedFields;
/// Points to fields in fModel that had new column representations appended to them.
std::vector<RAddedColumnRepr> fAddedColumnReprs;

RNTupleModelChangeset(RNTupleModel &model) : fModel(model) {}
bool IsEmpty() const { return fAddedFields.empty() && fAddedProjectedFields.empty(); }
Expand All @@ -420,6 +427,9 @@ struct RNTupleModelChangeset {
/// \see RNTupleModel::AddProjectedField()
ROOT::RResult<void>
AddProjectedField(std::unique_ptr<ROOT::RFieldBase> field, RNTupleModel::FieldMappingFunc_t mapping);

ROOT::RResult<void>
AddColumnRepr(ROOT::RFieldBase *field, const ROOT::RFieldBase::RColumnRepresentations::Selection_t &reprs);
};

} // namespace Internal
Expand Down
1 change: 1 addition & 0 deletions tree/ntuple/inc/ROOT/RNTupleWriter.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ class RNTupleWriter {
friend std::unique_ptr<RNTupleWriter>
Experimental::RNTupleWriter_Append(std::unique_ptr<ROOT::RNTupleModel> model, std::string_view ntuplePath,
ROOT::Experimental::RFile &file, const ROOT::RNTupleWriteOptions &options);
friend Internal::RPageSink &Internal::GetWriterSink(RNTupleWriter &);

private:
RNTupleFillContext fFillContext;
Expand Down
35 changes: 33 additions & 2 deletions tree/ntuple/src/RFieldBase.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ void ROOT::Internal::CallConnectPageSourceOnField(RFieldBase &field, ROOT::Inter
{
field.ConnectPageSource(source);
}
void ROOT::Internal::CallConnectExtendedColumnsToPageSinkOnField(RFieldBase &field, ROOT::Internal::RPageSink &sink,
ROOT::NTupleSize_t firstEntry)
{
field.ConnectExtendedColumnsToPageSink(sink, firstEntry);
}

ROOT::RResult<std::unique_ptr<ROOT::RFieldBase>>
ROOT::Internal::CallFieldBaseCreate(const std::string &fieldName, const std::string &typeName,
Expand Down Expand Up @@ -825,8 +830,17 @@ ROOT::RFieldBase::RColumnRepresentations::Selection_t ROOT::RFieldBase::GetColum

void ROOT::RFieldBase::SetColumnRepresentatives(const RColumnRepresentations::Selection_t &representatives)
{
if (fState != EState::kUnconnected)
throw RException(R__FAIL("cannot set column representative once field is connected"));
const auto isNewSelectionASuperset = [&]() {
if (representatives.size() < fColumnRepresentatives.size())
return false;
for (auto i = 0u; i < fColumnRepresentatives.size(); ++i)
if (representatives[i] != fColumnRepresentatives[i].get())
return false;
return true;
};
if (fState != EState::kUnconnected && !(fState == EState::kConnectedToSink && isNewSelectionASuperset()))
throw RException(R__FAIL("cannot set column representative once field is connected unless the new selection is a "
"superset of the previous"));
const auto &validTypes = GetColumnRepresentations().GetSerializationTypes();
fColumnRepresentatives.clear();
fColumnRepresentatives.reserve(representatives.size());
Expand Down Expand Up @@ -944,6 +958,23 @@ void ROOT::RFieldBase::ConnectPageSink(ROOT::Internal::RPageSink &pageSink, ROOT
fState = EState::kConnectedToSink;
}

void ROOT::RFieldBase::ConnectExtendedColumnsToPageSink(ROOT::Internal::RPageSink &pageSink,
ROOT::NTupleSize_t firstEntry)
{
if (fState != EState::kConnectedToSink)
throw RException(
R__FAIL("invalid attempt to connect extended columns of a field that is not connected to a sink"));

const auto nPrev = fAvailableColumns.size();
GenerateColumns();

for (auto i = nPrev, len = fAvailableColumns.size(); i < len; ++i) {
auto &column = fAvailableColumns[i];
auto firstElementIndex = (column->GetIndex() == 0) ? EntryToColumnElementIndex(firstEntry) : 0;
column->ConnectPageSink(fOnDiskId, pageSink, firstElementIndex);
}
}

void ROOT::RFieldBase::ConnectPageSource(ROOT::Internal::RPageSource &pageSource)
{
if (dynamic_cast<ROOT::RFieldZero *>(this)) {
Expand Down
21 changes: 21 additions & 0 deletions tree/ntuple/src/RNTupleModel.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,27 @@ ROOT::RNTupleModel::RUpdater::AddProjectedField(std::unique_ptr<ROOT::RFieldBase
return R__FORWARD_RESULT(fOpenChangeset.AddProjectedField(std::move(field), std::move(mapping)));
}

ROOT::RResult<void> ROOT::Internal::RNTupleModelChangeset::AddColumnRepr(
ROOT::RFieldBase *field, const ROOT::RFieldBase::RColumnRepresentations::Selection_t &newReprs)
{
auto reprs = field->GetColumnRepresentatives();
const auto nPrev = reprs.size();
for (auto newRepr : newReprs)
// NOTE: we don't need to check for duplicates because SetColumnRepresentatives will do that for us.
reprs.push_back(newRepr);

field->SetColumnRepresentatives(reprs);
const auto nNew = field->GetColumnRepresentatives().size();
assert(reprs.size() > 0 && reprs[0].size() <= 2);
const auto cardinality = static_cast<std::uint16_t>(reprs[0].size());
assert(nNew >= nPrev);
std::uint16_t nAdded = (nNew - nPrev) * cardinality;
if (nAdded > 0)
fAddedColumnReprs.push_back(RAddedColumnRepr{field, nAdded});

return RResult<void>::Success();
}

void ROOT::RNTupleModel::EnsureValidFieldName(std::string_view fieldName)
{
RResult<void> nameValid = ROOT::Internal::EnsureValidNameForRNTuple(fieldName, "Field");
Expand Down
5 changes: 5 additions & 0 deletions tree/ntuple/src/RNTupleWriter.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -160,3 +160,8 @@ ROOT::Experimental::RNTupleWriter_Append(std::unique_ptr<ROOT::RNTupleModel> mod
auto sink = std::make_unique<ROOT::Internal::RPageSinkFile>(ntupleBasename, file, ntupleDir, options);
return ROOT::RNTupleWriter::Create(std::move(model), std::move(sink), options);
}

ROOT::Internal::RPageSink &ROOT::Internal::GetWriterSink(ROOT::RNTupleWriter &writer)
{
return *writer.fFillContext.fSink;
}
8 changes: 8 additions & 0 deletions tree/ntuple/src/RPageSinkBuf.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,20 @@ void ROOT::Internal::RPageSinkBuf::UpdateSchema(const ROOT::Internal::RNTupleMod
GetProjectedFieldsOfModel(*fInnerModel).Add(std::move(cloned), fieldMap);
return p;
};
auto cloneAddColumnRepr = [&](const RNTupleModelChangeset::RAddedColumnRepr &repr) {
auto &innerField = fInnerModel->GetMutableField(repr.fField->GetFieldName());
innerField.SetColumnRepresentatives(repr.fField->GetColumnRepresentatives());
ROOT::Internal::CallConnectExtendedColumnsToPageSinkOnField(innerField, *this, firstEntry);
return repr;
};
RNTupleModelChangeset innerChangeset{*fInnerModel};
fInnerModel->Unfreeze();
std::transform(changeset.fAddedFields.cbegin(), changeset.fAddedFields.cend(),
std::back_inserter(innerChangeset.fAddedFields), cloneAddField);
std::transform(changeset.fAddedProjectedFields.cbegin(), changeset.fAddedProjectedFields.cend(),
std::back_inserter(innerChangeset.fAddedProjectedFields), cloneAddProjectedField);
std::transform(changeset.fAddedColumnReprs.cbegin(), changeset.fAddedColumnReprs.cend(),
std::back_inserter(innerChangeset.fAddedColumnReprs), cloneAddColumnRepr);
fInnerModel->Freeze();
fInnerSink->UpdateSchema(innerChangeset, firstEntry);
}
Expand Down
6 changes: 6 additions & 0 deletions tree/ntuple/src/RPageStorage.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -868,6 +868,9 @@ void ROOT::Internal::RPagePersistentSink::UpdateSchema(const ROOT::Internal::RNT
for (const auto &descendant : *f)
nNewPhysicalColumns += getNColumns(descendant);
}
for (auto added : changeset.fAddedColumnReprs) {
nNewPhysicalColumns += added.fNumNewColumns;
}
fDescriptorBuilder.ShiftAliasColumns(nNewPhysicalColumns);
}

Expand Down Expand Up @@ -913,6 +916,9 @@ void ROOT::Internal::RPagePersistentSink::UpdateSchema(const ROOT::Internal::RNT
for (auto &descendant : *f)
addProjectedField(descendant);
}
for (const auto& [f, _] : changeset.fAddedColumnReprs) {
ROOT::Internal::CallConnectExtendedColumnsToPageSinkOnField(*f, *this, firstEntry);
}

const auto nColumns = descriptor.GetNPhysicalColumns();
fOpenColumnRanges.reserve(fOpenColumnRanges.size() + (nColumns - nColumnsBeforeUpdate));
Expand Down
Loading
Loading