@@ -91,14 +91,17 @@ HiveDataSource::HiveDataSource(
9191 FileHandleFactory* fileHandleFactory,
9292 folly::Executor* ioExecutor,
9393 const ConnectorQueryCtx* connectorQueryCtx,
94- const std::shared_ptr<HiveConfig>& hiveConfig)
95- : fileHandleFactory_(fileHandleFactory),
94+ const std::shared_ptr<HiveConfig>& hiveConfig,
95+ bool pushdownCasts)
96+ : assignments_(assignments),
97+ fileHandleFactory_ (fileHandleFactory),
9698 ioExecutor_(ioExecutor),
9799 connectorQueryCtx_(connectorQueryCtx),
98100 hiveConfig_(hiveConfig),
99101 pool_(connectorQueryCtx->memoryPool ()),
100102 outputType_(outputType),
101- expressionEvaluator_(connectorQueryCtx->expressionEvaluator ()) {
103+ expressionEvaluator_(connectorQueryCtx->expressionEvaluator ()),
104+ pushdownCasts_(pushdownCasts) {
102105 hiveTableHandle_ =
103106 std::dynamic_pointer_cast<const HiveTableHandle>(tableHandle);
104107 VELOX_CHECK_NOT_NULL (
@@ -138,22 +141,44 @@ HiveDataSource::HiveDataSource(
138141 }
139142
140143 std::vector<std::string> readColumnNames;
141- auto readColumnTypes = outputType_->children ();
142- for (const auto & outputName : outputType_->names ()) {
143- auto it = assignments.find (outputName);
144+ std::vector<TypePtr> readColumnTypes;
145+ std::vector<std::string> readColumnNamesWithoutUpcasts;
146+ std::vector<TypePtr> readColumnTypesWithoutUpcasts;
147+
148+ // outputType_ contains the upcast columns if pushdownCasts_ is true.
149+ for (int i = 0 ; i < outputType_->size (); ++i) {
150+ auto columnName = outputType_->nameOf (i); // e.g. order_id_21_upcast
151+ auto & columnType = outputType_->childAt (i);
152+
153+ auto originalColumnName = columnName;
154+ if (pushdownCasts_ && columnName.ends_with (" _upcast" )) {
155+ originalColumnName =
156+ columnName.substr (0 , columnName.size () - strlen (" _upcast" ));
157+ }
158+
159+ // Get the ColumnHandle name. This is the name without aliasing. e.g.
160+ // originalColumnName="order_id_21", and columnHandleName="order_id"
161+ auto it = assignments_.find (originalColumnName);
144162 VELOX_CHECK (
145- it != assignments .end (),
163+ it != assignments_ .end (),
146164 " ColumnHandle is missing for output column: {}" ,
147- outputName);
148-
165+ columnName);
149166 auto * handle = static_cast <const HiveColumnHandle*>(it->second .get ());
150- readColumnNames.push_back (handle->name ());
167+ auto columnHandleName = handle->name ();
168+
169+ if (!pushdownCasts_ || !columnName.ends_with (" _upcast" )) {
170+ readColumnNamesWithoutUpcasts.push_back (columnHandleName);
171+ readColumnTypesWithoutUpcasts.push_back (columnType);
172+ }
173+ readColumnNames.push_back (columnHandleName);
174+ readColumnTypes.push_back (columnType);
175+
151176 for (auto & subfield : handle->requiredSubfields ()) {
152177 VELOX_USER_CHECK_EQ (
153178 getColumnName (subfield),
154179 handle->name (),
155180 " Required subfield does not match column name" );
156- subfields_[handle-> name () ].push_back (&subfield);
181+ subfields_[columnHandleName ].push_back (&subfield);
157182 }
158183 columnPostProcessors_.push_back (handle->postProcessor ());
159184 }
@@ -195,7 +220,7 @@ HiveDataSource::HiveDataSource(
195220 }
196221 // Remaining filter may reference columns that are not used otherwise,
197222 // e.g. are not being projected out and are not used in range filters.
198- // Make sure to add these columns to readerOutputType_ .
223+ // Make sure to add these columns to readerOutputTypeWithoutUpcasts_ .
199224 readColumnNames.push_back (input->field ());
200225 readColumnTypes.push_back (input->type ());
201226 }
@@ -222,8 +247,12 @@ HiveDataSource::HiveDataSource(
222247
223248 readerOutputType_ =
224249 ROW (std::move (readColumnNames), std::move (readColumnTypes));
250+ // NO upcast columns
251+ readerOutputTypeWithoutUpcasts_ =
252+ ROW (std::move (readColumnNamesWithoutUpcasts),
253+ std::move (readColumnTypesWithoutUpcasts));
225254 scanSpec_ = makeScanSpec (
226- readerOutputType_ ,
255+ readerOutputTypeWithoutUpcasts_ ,
227256 subfields_,
228257 filters_,
229258 hiveTableHandle_->dataColumns (),
@@ -249,7 +278,7 @@ std::unique_ptr<SplitReader> HiveDataSource::createSplitReader() {
249278 &partitionKeys_,
250279 connectorQueryCtx_,
251280 hiveConfig_,
252- readerOutputType_ ,
281+ readerOutputTypeWithoutUpcasts_ ,
253282 ioStats_,
254283 fsStats_,
255284 fileHandleFactory_,
@@ -273,11 +302,12 @@ std::vector<column_index_t> HiveDataSource::setupBucketConversion() {
273302 if (subfields_.erase (handle->name ()) > 0 ) {
274303 rebuildScanSpec = true ;
275304 }
276- auto index = readerOutputType_->getChildIdxIfExists (handle->name ());
305+ auto index =
306+ readerOutputTypeWithoutUpcasts_->getChildIdxIfExists (handle->name ());
277307 if (!index.has_value ()) {
278308 if (names.empty ()) {
279- names = readerOutputType_ ->names ();
280- types = readerOutputType_ ->children ();
309+ names = readerOutputTypeWithoutUpcasts_ ->names ();
310+ types = readerOutputTypeWithoutUpcasts_ ->children ();
281311 }
282312 index = names.size ();
283313 names.push_back (handle->name ());
@@ -288,11 +318,11 @@ std::vector<column_index_t> HiveDataSource::setupBucketConversion() {
288318 bucketChannels.push_back (*index);
289319 }
290320 if (!names.empty ()) {
291- readerOutputType_ = ROW (std::move (names), std::move (types));
321+ readerOutputTypeWithoutUpcasts_ = ROW (std::move (names), std::move (types));
292322 }
293323 if (rebuildScanSpec) {
294324 auto newScanSpec = makeScanSpec (
295- readerOutputType_ ,
325+ readerOutputTypeWithoutUpcasts_ ,
296326 subfields_,
297327 filters_,
298328 hiveTableHandle_->dataColumns (),
@@ -314,7 +344,8 @@ void HiveDataSource::setupRowIdColumn() {
314344 auto * rowId = scanSpec_->childByName (*specialColumns_.rowId );
315345 VELOX_CHECK_NOT_NULL (rowId);
316346 auto & rowIdType =
317- readerOutputType_->findChild (*specialColumns_.rowId )->asRow ();
347+ readerOutputTypeWithoutUpcasts_->findChild (*specialColumns_.rowId )
348+ ->asRow ();
318349 auto rowGroupId = split_->getFileName ();
319350 rowId->childByName (rowIdType.nameOf (1 ))
320351 ->setConstantValue <StringView>(
@@ -339,8 +370,6 @@ void HiveDataSource::addSplit(std::shared_ptr<ConnectorSplit> split) {
339370 split_ = std::dynamic_pointer_cast<HiveConnectorSplit>(split);
340371 VELOX_CHECK_NOT_NULL (split_, " Wrong type of split" );
341372
342- VLOG (1 ) << " Adding split " << split_->toString ();
343-
344373 if (splitReader_) {
345374 splitReader_.reset ();
346375 }
@@ -361,7 +390,7 @@ void HiveDataSource::addSplit(std::shared_ptr<ConnectorSplit> split) {
361390 // so we initialize it beforehand.
362391 splitReader_->configureReaderOptions (randomSkip_);
363392 splitReader_->prepareSplit (metadataFilter_, runtimeStats_);
364- readerOutputType_ = splitReader_->readerOutputType ();
393+ readerOutputTypeWithoutUpcasts_ = splitReader_->readerOutputType ();
365394}
366395
367396std::optional<RowVectorPtr> HiveDataSource::next (
@@ -380,14 +409,16 @@ std::optional<RowVectorPtr> HiveDataSource::next(
380409
381410 // Bucket conversion or delta update could add extra column to reader output.
382411 auto needsExtraColumn = [&] {
383- return output_ ->asUnchecked <RowVector>()->childrenSize () <
384- readerOutputType_ ->size ();
412+ return outputWithoutUpcasts_ ->asUnchecked <RowVector>()->childrenSize () <
413+ readerOutputTypeWithoutUpcasts_ ->size ();
385414 };
386- if (!output_ || needsExtraColumn ()) {
387- output_ = BaseVector::create (readerOutputType_, 0 , pool_);
415+ if (!outputWithoutUpcasts_ || needsExtraColumn ()) {
416+ outputWithoutUpcasts_ =
417+ BaseVector::create (readerOutputTypeWithoutUpcasts_, 0 , pool_);
388418 }
389419
390- const auto rowsScanned = splitReader_->next (size, output_);
420+ // Read only the real columns, not the upcast columns.
421+ const auto rowsScanned = splitReader_->next (size, outputWithoutUpcasts_);
391422 completedRows_ += rowsScanned;
392423 if (rowsScanned == 0 ) {
393424 splitReader_->updateRuntimeStats (runtimeStats_);
@@ -396,14 +427,15 @@ std::optional<RowVectorPtr> HiveDataSource::next(
396427 }
397428
398429 VELOX_CHECK (
399- !output_->mayHaveNulls (), " Top-level row vector cannot have nulls" );
400- auto rowsRemaining = output_->size ();
430+ !outputWithoutUpcasts_->mayHaveNulls (),
431+ " Top-level row vector cannot have nulls" );
432+ auto rowsRemaining = outputWithoutUpcasts_->size ();
401433 if (rowsRemaining == 0 ) {
402434 // no rows passed the pushed down filters.
403435 return getEmptyOutput ();
404436 }
405437
406- auto rowVector = std::dynamic_pointer_cast<RowVector>(output_ );
438+ auto rowVector = std::dynamic_pointer_cast<RowVector>(outputWithoutUpcasts_ );
407439
408440 // In case there is a remaining filter that excludes some but not all
409441 // rows, collect the indices of the passing rows. If there is no filter,
@@ -433,12 +465,49 @@ std::optional<RowVectorPtr> HiveDataSource::next(
433465 std::vector<VectorPtr> outputColumns;
434466 outputColumns.reserve (outputType_->size ());
435467 for (int i = 0 ; i < outputType_->size (); ++i) {
436- auto & child = rowVector->childAt (i);
437- if (remainingIndices) {
438- // Disable dictionary values caching in expression eval so that we
439- // don't need to reallocate the result for every batch.
440- child->disableMemo ();
468+ std::shared_ptr<BaseVector> child;
469+ // find the upcast columns and add them to outputWithoutUpcasts_
470+ const auto & columnName = outputType_->nameOf (i);
471+ // outputType_ includes the upcast columns,
472+ const auto & columnType = outputType_->childAt (i);
473+
474+ if (columnName.ends_with (" _upcast" )) {
475+ auto originalOutputName =
476+ columnName.substr (0 , columnName.size () - strlen (" _upcast" ));
477+ auto columnHandleIt = assignments_.find (originalOutputName);
478+ VELOX_CHECK (
479+ columnHandleIt != assignments_.end (),
480+ " Cannot find column handle for upcast column: {} original: {}" ,
481+ columnName,
482+ originalOutputName);
483+ auto columnHandleName =
484+ static_cast <const HiveColumnHandle*>(columnHandleIt->second .get ())
485+ ->name ();
486+
487+ // rowVector does not have the upcast columns.
488+ auto index = readerOutputTypeWithoutUpcasts_->getChildIdxIfExists (
489+ columnHandleName);
490+ VELOX_CHECK (index.has_value ());
491+ auto originalColumn = rowVector->childAt (*index);
492+
493+ child = BaseVector::create (columnType, originalColumn->size (), pool_);
494+ child->copy (originalColumn.get (), 0 , 0 , originalColumn->size ());
495+ } else {
496+ auto columnHandleIt = assignments_.find (columnName);
497+ VELOX_CHECK (
498+ columnHandleIt != assignments_.end (),
499+ " Cannot find column handle for upcast column: {} original: {}" ,
500+ columnName,
501+ columnName);
502+ auto columnHandleName =
503+ static_cast <const HiveColumnHandle*>(columnHandleIt->second .get ())
504+ ->name ();
505+ auto index =
506+ readerOutputTypeWithoutUpcasts_->getChildIdxIfExists (columnHandleName);
507+ VELOX_CHECK (index.has_value ());
508+ child = rowVector->childAt (*index);
441509 }
510+
442511 auto column = exec::wrapChild (rowsRemaining, remainingIndices, child);
443512 if (columnPostProcessors_[i]) {
444513 columnPostProcessors_[i](column);
@@ -541,7 +610,8 @@ void HiveDataSource::setFromDataSource(
541610 runtimeStats_.skippedSplits += source->runtimeStats_ .skippedSplits ;
542611 runtimeStats_.processedSplits += source->runtimeStats_ .processedSplits ;
543612 runtimeStats_.skippedSplitBytes += source->runtimeStats_ .skippedSplitBytes ;
544- readerOutputType_ = std::move (source->readerOutputType_ );
613+ readerOutputTypeWithoutUpcasts_ =
614+ std::move (source->readerOutputTypeWithoutUpcasts_ );
545615 source->scanSpec_ ->moveAdaptationFrom (*scanSpec_);
546616 scanSpec_ = std::move (source->scanSpec_ );
547617 metadataFilter_ = std::move (source->metadataFilter_ );
@@ -600,7 +670,7 @@ std::shared_ptr<wave::WaveDataSource> HiveDataSource::toWaveDataSource() {
600670 waveDataSource_ = waveDelegateHook_ (
601671 hiveTableHandle_,
602672 scanSpec_,
603- readerOutputType_ ,
673+ readerOutputTypeWithoutUpcasts_ ,
604674 &partitionKeys_,
605675 fileHandleFactory_,
606676 ioExecutor_,
0 commit comments