Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.facebook.presto.sql.analyzer.FeaturesConfig;
import com.facebook.presto.sql.analyzer.FeaturesConfig.AggregationIfToFilterRewriteStrategy;
import com.facebook.presto.sql.analyzer.FeaturesConfig.AggregationPartitioningMergingStrategy;
import com.facebook.presto.sql.analyzer.FeaturesConfig.CteMaterializationStrategy;
import com.facebook.presto.sql.analyzer.FeaturesConfig.JoinDistributionType;
import com.facebook.presto.sql.analyzer.FeaturesConfig.JoinNotNullInferenceStrategy;
import com.facebook.presto.sql.analyzer.FeaturesConfig.JoinReorderingStrategy;
Expand Down Expand Up @@ -184,6 +185,7 @@ public final class SystemSessionProperties
public static final String MAX_DRIVERS_PER_TASK = "max_drivers_per_task";
public static final String MAX_TASKS_PER_STAGE = "max_tasks_per_stage";
public static final String DEFAULT_FILTER_FACTOR_ENABLED = "default_filter_factor_enabled";
public static final String CTE_MATERIALIZATION_STRATEGY = "cte_materialization_strategy";
public static final String DEFAULT_JOIN_SELECTIVITY_COEFFICIENT = "default_join_selectivity_coefficient";
public static final String PUSH_LIMIT_THROUGH_OUTER_JOIN = "push_limit_through_outer_join";
public static final String OPTIMIZE_CONSTANT_GROUPING_KEYS = "optimize_constant_grouping_keys";
Expand Down Expand Up @@ -1023,6 +1025,18 @@ public SystemSessionProperties(
"use a default filter factor for unknown filters in a filter node",
featuresConfig.isDefaultFilterFactorEnabled(),
false),
new PropertyMetadata<>(
CTE_MATERIALIZATION_STRATEGY,
format("The strategy to materialize common table expressions. Options are %s",
Stream.of(CteMaterializationStrategy.values())
.map(CteMaterializationStrategy::name)
.collect(joining(","))),
VARCHAR,
CteMaterializationStrategy.class,
featuresConfig.getCteMaterializationStrategy(),
false,
value -> CteMaterializationStrategy.valueOf(((String) value).toUpperCase()),
CteMaterializationStrategy::name),
new PropertyMetadata<>(
DEFAULT_JOIN_SELECTIVITY_COEFFICIENT,
"use a default join selectivity coefficient factor when column statistics are not available in a join node",
Expand Down Expand Up @@ -2280,6 +2294,11 @@ public static DataSize getFilterAndProjectMinOutputPageSize(Session session)
return session.getSystemProperty(FILTER_AND_PROJECT_MIN_OUTPUT_PAGE_SIZE, DataSize.class);
}

public static boolean isMaterializeAllCtes(Session session)
{
return session.getSystemProperty(CTE_MATERIALIZATION_STRATEGY, CteMaterializationStrategy.class).equals(CteMaterializationStrategy.ALL);
}

public static int getFilterAndProjectMinOutputPageRowCount(Session session)
{
return session.getSystemProperty(FILTER_AND_PROJECT_MIN_OUTPUT_PAGE_ROW_COUNT, Integer.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@

import com.facebook.presto.Session;
import com.facebook.presto.spi.plan.AggregationNode;
import com.facebook.presto.spi.plan.CteConsumerNode;
import com.facebook.presto.spi.plan.CteProducerNode;
import com.facebook.presto.spi.plan.FilterNode;
import com.facebook.presto.spi.plan.LimitNode;
import com.facebook.presto.spi.plan.OutputNode;
import com.facebook.presto.spi.plan.PlanNode;
import com.facebook.presto.spi.plan.ProjectNode;
import com.facebook.presto.spi.plan.SequenceNode;
import com.facebook.presto.spi.plan.TableScanNode;
import com.facebook.presto.spi.plan.UnionNode;
import com.facebook.presto.spi.plan.ValuesNode;
Expand All @@ -46,6 +49,7 @@
import java.util.Optional;
import java.util.stream.Stream;

import static com.facebook.presto.cost.CostCalculatorWithEstimatedExchanges.calculateCteProducerCost;
import static com.facebook.presto.cost.CostCalculatorWithEstimatedExchanges.calculateJoinInputCost;
import static com.facebook.presto.cost.CostCalculatorWithEstimatedExchanges.calculateLocalRepartitionCost;
import static com.facebook.presto.cost.CostCalculatorWithEstimatedExchanges.calculateRemoteGatherCost;
Expand Down Expand Up @@ -155,6 +159,25 @@ public PlanCostEstimate visitFilter(FilterNode node, Void context)
return costForStreaming(node, localCost);
}

@Override
public PlanCostEstimate visitCteProducer(CteProducerNode node, Void context)
{
LocalCostEstimate localCost = calculateCteProducerCost(stats, node.getSource());
return costForStreaming(node, localCost);
}

@Override
public PlanCostEstimate visitCteConsumer(CteConsumerNode node, Void context)
{
return node.getOriginalSource().accept(this, context);
}

@Override
public PlanCostEstimate visitSequence(SequenceNode node, Void context)
{
return costForStreaming(node, LocalCostEstimate.zero());
}

@Override
public PlanCostEstimate visitProject(ProjectNode node, Void context)
{
Expand Down Expand Up @@ -349,6 +372,19 @@ private PlanCostEstimate costForStreaming(PlanNode node, LocalCostEstimate local
sourcesCost.getNetworkCost() + localCost.getNetworkCost());
}

private PlanCostEstimate costForStreamingForCteConsumer(CteConsumerNode node, LocalCostEstimate localCost)
{
PlanCostEstimate sourcesCost = getSourcesEstimations(node.getStatsEquivalentPlanNode().get())
.reduce(PlanCostEstimate.zero(), CostCalculatorUsingExchanges::addParallelSiblingsCost);
return new PlanCostEstimate(
sourcesCost.getCpuCost() + localCost.getCpuCost(),
max(
sourcesCost.getMaxMemory(), // Streaming operator allocates insignificant amount of memory (usually none) before first input page is received
sourcesCost.getMaxMemoryWhenOutputting() + localCost.getMaxMemory()),
sourcesCost.getMaxMemoryWhenOutputting() + localCost.getMaxMemory(),
sourcesCost.getNetworkCost() + localCost.getNetworkCost());
}

private PlanCostEstimate costForLookupJoin(PlanNode node, LocalCostEstimate localCost)
{
verify(node.getSources().size() == 2, "Unexpected number of sources for %s: %s", node, node.getSources());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.facebook.presto.spi.plan.AggregationNode;
import com.facebook.presto.spi.plan.IntersectNode;
import com.facebook.presto.spi.plan.PlanNode;
import com.facebook.presto.spi.plan.SequenceNode;
import com.facebook.presto.spi.plan.UnionNode;
import com.facebook.presto.sql.planner.iterative.GroupReference;
import com.facebook.presto.sql.planner.iterative.rule.DetermineJoinDistributionType;
Expand All @@ -34,6 +35,7 @@
import java.util.Optional;

import static com.facebook.presto.cost.LocalCostEstimate.addPartialComponents;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static java.util.Objects.requireNonNull;

/**
Expand Down Expand Up @@ -165,6 +167,13 @@ public LocalCostEstimate visitUnion(UnionNode node, Void context)
return calculateRemoteGatherCost(inputSizeInBytes);
}

@Override
public LocalCostEstimate visitSequence(SequenceNode node, Void context)
{
return addPartialComponents(node.getSources().stream().map(n -> n.accept(this, context))
.collect(toImmutableList()));
}

@Override
public LocalCostEstimate visitIntersect(IntersectNode node, Void context)
{
Expand All @@ -189,6 +198,13 @@ public static LocalCostEstimate calculateRemoteRepartitionCost(double inputSizeI
return LocalCostEstimate.of(inputSizeInBytes, 0, inputSizeInBytes);
}

public static LocalCostEstimate calculateCteProducerCost(StatsProvider statsProvider, PlanNode source)
{
double inputSizeInBytes = statsProvider.getStats(source).getOutputSizeInBytes(source);
// default HDFS replication is 3
return LocalCostEstimate.of(3 * inputSizeInBytes, 0, 3 * inputSizeInBytes);
}

public static LocalCostEstimate calculateLocalRepartitionCost(double inputSizeInBytes)
{
return LocalCostEstimate.ofCpu(inputSizeInBytes);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.cost;

import com.facebook.presto.Session;
import com.facebook.presto.matching.Pattern;
import com.facebook.presto.spi.plan.CteConsumerNode;
import com.facebook.presto.sql.planner.TypeProvider;
import com.facebook.presto.sql.planner.iterative.Lookup;

import java.util.Optional;

import static com.facebook.presto.sql.planner.plan.Patterns.cteConsumer;

public class CteConsumerStatsRule
implements ComposableStatsCalculator.Rule<CteConsumerNode>
{
private static final Pattern<CteConsumerNode> PATTERN = cteConsumer();

@Override
public Pattern<CteConsumerNode> getPattern()
{
return PATTERN;
}

@Override
public Optional<PlanNodeStatsEstimate> calculate(CteConsumerNode node, StatsProvider sourceStats, Lookup lookup, Session session, TypeProvider types)
{
return Optional.of(sourceStats.getStats(node.getOriginalSource()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.cost;

import com.facebook.presto.Session;
import com.facebook.presto.matching.Pattern;
import com.facebook.presto.spi.plan.CteProducerNode;
import com.facebook.presto.sql.planner.TypeProvider;
import com.facebook.presto.sql.planner.iterative.Lookup;

import java.util.Optional;

import static com.facebook.presto.sql.planner.plan.Patterns.cteProducer;

public class CteProducerStatsRule
implements ComposableStatsCalculator.Rule<CteProducerNode>
{
private static final Pattern<CteProducerNode> PATTERN = cteProducer();

@Override
public Pattern<CteProducerNode> getPattern()
{
return PATTERN;
}

@Override
public Optional<PlanNodeStatsEstimate> calculate(CteProducerNode node, StatsProvider sourceStats, Lookup lookup, Session session, TypeProvider types)
{
return Optional.of(sourceStats.getStats(node.getSource()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.facebook.presto.spi.plan.PlanNode;

import java.util.List;
import java.util.stream.Stream;

import static com.google.common.base.MoreObjects.toStringHelper;
Expand Down Expand Up @@ -106,4 +107,13 @@ public static LocalCostEstimate addPartialComponents(LocalCostEstimate one, Loca
a.maxMemory + b.maxMemory,
a.networkCost + b.networkCost));
}

public static LocalCostEstimate addPartialComponents(List<LocalCostEstimate> planList)
{
return planList.stream()
.reduce(zero(), (a, b) -> new LocalCostEstimate(
a.cpuCost + b.cpuCost,
a.maxMemory + b.maxMemory,
a.networkCost + b.networkCost));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.cost;

import com.facebook.presto.Session;
import com.facebook.presto.matching.Pattern;
import com.facebook.presto.spi.plan.SequenceNode;
import com.facebook.presto.sql.planner.TypeProvider;
import com.facebook.presto.sql.planner.iterative.Lookup;

import java.util.Optional;

import static com.facebook.presto.sql.planner.plan.Patterns.sequenceNode;

public class SequenceStatsRule
implements ComposableStatsCalculator.Rule<SequenceNode>
{
private static final Pattern<SequenceNode> PATTERN = sequenceNode();

@Override
public Pattern<SequenceNode> getPattern()
{
return PATTERN;
}

@Override
public Optional<PlanNodeStatsEstimate> calculate(SequenceNode node, StatsProvider sourceStats, Lookup lookup, Session session, TypeProvider types)
{
return Optional.of(sourceStats.getStats(node.getPrimarySource()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ public static ComposableStatsCalculator createComposableStatsCalculator(
rules.add(new SampleStatsRule(normalizer));
rules.add(new IntersectStatsRule(normalizer));
rules.add(new RemoteSourceStatsRule(fragmentStatsProvider, normalizer));
rules.add(new SequenceStatsRule());
rules.add(new CteProducerStatsRule());
rules.add(new CteConsumerStatsRule());

return new ComposableStatsCalculator(rules.build());
}
Expand Down
Loading