Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,8 @@ public static Object processReduceSinkToHashJoin(ReduceSinkOperator parentRS, Ma
ExprNodeDesc realCol = parentRS.getColumnExprMap().get(prefix + "." + keyCol);
ColStatistics cs =
StatsUtils.getColStatisticsFromExpression(context.conf, stats, realCol);
if (cs == null || cs.getCountDistint() <= 0) {
if (cs == null || cs.getCountDistint() < 0) {
// unknown: same fallback as old "no stats / overloaded NDV=0" path
maxKeyCount = Long.MAX_VALUE;
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ public Object process(Node nd, Stack<Node> stack,
Statistics parentStats = groupByOperator.getParentOperators().get(0).getStatistics();
long ndvProduct = StatsUtils.computeNDVGroupingColumns(
colStats, parentStats, true);
// if ndvProduct is 0 then column stats state must be partial and we are missing
if (ndvProduct == 0) {
if (ndvProduct < 0) {
// unknown product - same fallback as old "overloaded NDV=0" path
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
Expand Down Expand Up @@ -924,7 +925,8 @@ private long computeMaxWriters() {
* Computes the partition cardinality based on column NDV statistics.
* @return positive value = estimated cardinality, 0 = no partition columns, -1 = stats unavailable
*/
private long computePartCardinality(List<Integer> partitionPos,
@VisibleForTesting
long computePartCardinality(List<Integer> partitionPos,
List<Function<List<ExprNodeDesc>, ExprNodeDesc>> customPartitionExprs,
Statistics tStats, Operator<? extends OperatorDesc> fsParent,
ArrayList<ExprNodeDesc> allRSCols) {
Expand All @@ -935,7 +937,8 @@ private long computePartCardinality(List<Integer> partitionPos,
for (Integer idx : partitionPos) {
ColumnInfo ci = fsParent.getSchema().getSignature().get(idx);
ColStatistics partStats = tStats.getColumnStatisticsFromColName(ci.getInternalName());
if (partStats == null) {
// countDistinct < 0 means "unknown" - same path as missing stats
if (partStats == null || partStats.getCountDistint() < 0) {
return -1;
}
partCardinality *= partStats.getCountDistint();
Expand All @@ -950,7 +953,8 @@ private long computePartCardinality(List<Integer> partitionPos,
// implementations on UDFs (e.g. iceberg_bucket reports min(inputNDV, numBuckets))
ColStatistics exprStats = StatsUtils.getColStatisticsFromExpression(
this.parseCtx.getConf(), tStats, resolved);
if (exprStats == null) {
// countDistinct < 0 means "unknown" - same path as missing stats
if (exprStats == null || exprStats.getCountDistint() < 0) {
return -1;
}
partCardinality *= exprStats.getCountDistint();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.calcite.rel.convert.Converter;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.core.Spool;
import com.google.common.annotations.VisibleForTesting;
import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider;
import org.apache.calcite.rel.metadata.RelMdDistinctRowCount;
import org.apache.calcite.rel.metadata.RelMdUtil;
Expand Down Expand Up @@ -50,7 +51,8 @@ public class HiveRelMdDistinctRowCount extends RelMdDistinctRowCount {
ReflectiveRelMetadataProvider.reflectiveSource(
BuiltInMethod.DISTINCT_ROW_COUNT.method, new HiveRelMdDistinctRowCount());

private HiveRelMdDistinctRowCount() {
@VisibleForTesting
HiveRelMdDistinctRowCount() {
}

public Double getDistinctRowCount(HiveTableScan htRel, RelMetadataQuery mq, ImmutableBitSet groupKey,
Expand All @@ -60,6 +62,9 @@ public Double getDistinctRowCount(HiveTableScan htRel, RelMetadataQuery mq, Immu
List<ColStatistics> colStats = htRel.getColStat(projIndxLst);
Double noDistinctRows = 1.0;
for (ColStatistics cStat : colStats) {
if (cStat.getCountDistint() <= 0) {
return 0.0;
}
noDistinctRows *= cStat.getCountDistint();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -557,18 +557,25 @@ private long evaluateInExpr(Statistics stats, ExprNodeDesc pred, long currNumRow
factor *= children.size() - 1;
}
for (int i = 0; i < columnStats.size(); i++) {
long dvs = columnStats.get(i) == null ? 0 : columnStats.get(i).getCountDistint();
ColStatistics cs = columnStats.get(i);
long dvs = cs == null ? -1L : cs.getCountDistint();
if (dvs == 0) {
factor *= 0.5;
continue;
// verified zero distinct values: IN cannot match any row
factor = 0;
break;
}
// (num of distinct vals for col in IN clause / num of distinct vals for col )
double columnFactor = 1.0 / dvs;
if (!multiColumn) {
columnFactor *= estimateIntersectionSize(aspCtx.getConf(), columnStats.get(i), values.get(i));
if (dvs < 0) {
// missing stats or unknown NDV
factor *= 0.5;
} else {
// (num of distinct vals for col in IN clause / num of distinct vals for col )
double columnFactor = 1.0 / dvs;
if (!multiColumn) {
columnFactor *= estimateIntersectionSize(aspCtx.getConf(), columnStats.get(i), values.get(i));
}
// max can be 1, even when ndv is larger in IN clause than in column stats
factor *= Math.min(columnFactor, 1.0);
}
// max can be 1, even when ndv is larger in IN clause than in column stats
factor *= Math.min(columnFactor, 1.0);
}

// Clamp at 1 to be sure that we don't get out of range.
Expand Down Expand Up @@ -1317,9 +1324,7 @@ private long evaluateChildExpr(Statistics stats, ExprNodeDesc child,

ColStatistics cs = stats.getColumnStatisticsFromColName(colName);
if (cs != null) {
long dvs = cs.getCountDistint();
numRows = dvs == 0 ? numRows / 2 : Math.round((double) numRows / dvs);
return numRows;
return rowsAfterEqualityFilter(numRows, cs.getCountDistint());
}
} else if (leaf instanceof ExprNodeColumnDesc) {
ExprNodeColumnDesc colDesc = (ExprNodeColumnDesc) leaf;
Expand All @@ -1338,9 +1343,7 @@ private long evaluateChildExpr(Statistics stats, ExprNodeDesc child,

ColStatistics cs = stats.getColumnStatisticsFromColName(colName);
if (cs != null) {
long dvs = cs.getCountDistint();
numRows = dvs == 0 ? numRows / 2 : Math.round((double) numRows / dvs);
return numRows;
return rowsAfterEqualityFilter(numRows, cs.getCountDistint());
}
}
}
Expand Down Expand Up @@ -1380,6 +1383,16 @@ private long evaluateChildExpr(Statistics stats, ExprNodeDesc child,
return numRows / 2;
}

private static long rowsAfterEqualityFilter(long numRows, long dvs) {
if (dvs < 0) {
return numRows / 2;
}
if (dvs == 0) {
return 0;
}
return Math.round((double) numRows / dvs);
}

}

/**
Expand Down Expand Up @@ -1518,14 +1531,12 @@ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
// compute product of distinct values of grouping columns
long ndvProduct =
StatsUtils.computeNDVGroupingColumns(colStats, parentStats, false);
// if ndvProduct is 0 then column stats state must be partial and we are missing
// column stats for a group by column
if (ndvProduct == 0) {
if (ndvProduct < 0) {
// unknown - missing column stats or unknown NDV on a grouping column
ndvProduct = parentNumRows / 2;

if (LOG.isDebugEnabled()) {
LOG.debug("STATS-" + gop.toString() + ": ndvProduct became 0 as some column does not" +
" have stats. ndvProduct changed to: " + ndvProduct);
LOG.debug("STATS-{}: ndvProduct unknown; falling back to {}", gop, ndvProduct);
}
}
final long maxColumnNDV = colStats.stream()
Expand Down Expand Up @@ -1720,6 +1731,10 @@ static void computeAggregateColumnMinMax(ColStatistics cs, HiveConf conf, Aggreg
long valuesCount = agg.getDistinct() ?
parentCS.getCountDistint() :
parentStats.getNumRows() - numNulls;
// countDistinct < 0 would produce a Range with a negative maxValue
if (agg.getDistinct() && valuesCount < 0) {
return;
}
Range range = parentCS.getRange();
// Get the aggregate function matching the name in the query.
GenericUDAFResolver udaf =
Expand Down Expand Up @@ -1819,9 +1834,24 @@ private boolean checkMapSideAggregation(GroupByOperator gop,

// estimate size of key from column statistics
long avgKeySize = 0;
// lazily computed on first unknown NDV (null = not yet looked up)
Long parentNumRows = null;
for (ColStatistics cs : colStats) {
if (cs != null) {
numEstimatedRows = StatsUtils.safeMult(numEstimatedRows, cs.getCountDistint());
long ndv = cs.getCountDistint();
if (ndv < 0) {
if (parentNumRows == null) {
// unknown NDV: fall back to parentNumRows / 2, matching the heuristic
// used elsewhere in this file when GROUP BY cardinality cannot be computed
Statistics parentStats = gop.getParentOperators().get(0).getStatistics();
parentNumRows = (parentStats != null) ? parentStats.getNumRows() : -1L;
}
if (parentNumRows <= 0) {
return false;
}
ndv = parentNumRows / 2;
}
numEstimatedRows = StatsUtils.safeMult(numEstimatedRows, ndv);
avgKeySize += Math.ceil(cs.getAvgColLen());
}
}
Expand Down Expand Up @@ -2227,7 +2257,8 @@ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
return null;
}

private long calculateUnmatchedRowsForOuter(HiveConf conf, long inputRowCount,
@VisibleForTesting
long calculateUnmatchedRowsForOuter(HiveConf conf, long inputRowCount,
List<String> joinKeys, Statistics statistics, long distinctUnmatched) {
// Extract the ndv from each of the columns involved in the join
List<Long> distinctVals = new ArrayList<>();
Expand All @@ -2248,14 +2279,15 @@ private long calculateUnmatchedRowsForOuter(HiveConf conf, long inputRowCount,
distinctVal = StatsUtils.addWithExpDecay(distinctVals);
}
}
// If we have a greater number of unmatched values than number of distinct values,
// we just return the number of rows in the input as we can assume there are no
// matches
if (distinctUnmatched >= distinctVal) {
// distinctVal <= 0 covers unknown (<0) and verified-zero (==0) cases; the latter means
// no key value matches anything, so every input row is unmatched in an outer join.
// distinctUnmatched < 0 (unknown) is treated conservatively the same way.
// If unmatched >= distinctVal, all rows can be assumed unmatched.
if (distinctVal <= 0 || distinctUnmatched < 0 || distinctUnmatched >= distinctVal) {
return inputRowCount;
}
// Otherwise, divide the number of input rows by the number of distinct values
// and divide by the number of distinct values unmatched
// and multiply by the number of distinct values unmatched
return StatsUtils.safeMult(inputRowCount / distinctVal, distinctUnmatched);
}

Expand Down Expand Up @@ -2604,7 +2636,8 @@ void updateNumNulls(ColStatistics colStats, long leftUnmatchedRows, long rightUn
colStats.setNumNulls(newNumNulls);
}

private void updateColStats(HiveConf conf, Statistics stats, long leftUnmatchedRows, long rightUnmatchedRows,
@VisibleForTesting
void updateColStats(HiveConf conf, Statistics stats, long leftUnmatchedRows, long rightUnmatchedRows,
long newNumRows, CommonJoinOperator<? extends JoinDesc> jop, Map<Integer, Long> rowCountParents) {

if (newNumRows < 0) {
Expand Down Expand Up @@ -2632,26 +2665,29 @@ private void updateColStats(HiveConf conf, Statistics stats, long leftUnmatchedR
int pos = jop.getConf().getReversedExprs().get(cs.getColumnName());
long oldDV = cs.getCountDistint();

boolean useCalciteForNdvReadjustment
= HiveConf.getBoolVar(conf, ConfVars.HIVE_STATS_JOIN_NDV_READJUSTMENT);
long newDV = oldDV;
if (useCalciteForNdvReadjustment) {
Double approxNdv = RelMdUtil.numDistinctVals(oldDV * 1.0, newNumRows * 1.0);
Preconditions.checkNotNull(approxNdv, "approximate NDV is null");
newDV = approxNdv.longValue();
} else {
long oldRowCount = rowCountParents.get(pos);
double ratio = (double) newNumRows / (double) oldRowCount;

// if ratio is greater than 1, then number of rows increases. This can happen
// when some operators like GROUPBY duplicates the input rows in which case
// number of distincts should not change. Update the distinct count only when
// the output number of rows is less than input number of rows.
if (ratio <= 1.0) {
newDV = (long) Math.ceil(ratio * oldDV);
// countDistinct < 0 means "unknown"
if (oldDV >= 0) {
boolean useCalciteForNdvReadjustment
= HiveConf.getBoolVar(conf, ConfVars.HIVE_STATS_JOIN_NDV_READJUSTMENT);
long newDV = oldDV;
if (useCalciteForNdvReadjustment) {
Double approxNdv = RelMdUtil.numDistinctVals(oldDV * 1.0, newNumRows * 1.0);
Preconditions.checkNotNull(approxNdv, "approximate NDV is null");
newDV = approxNdv.longValue();
} else {
long oldRowCount = rowCountParents.get(pos);
double ratio = (double) newNumRows / (double) oldRowCount;

// if ratio is greater than 1, then number of rows increases. This can happen
// when some operators like GROUPBY duplicates the input rows in which case
// number of distincts should not change. Update the distinct count only when
// the output number of rows is less than input number of rows.
if (ratio < 1.0) {
newDV = (long) Math.ceil(ratio * oldDV);
}
}
cs.setCountDistint(newDV);
}
cs.setCountDistint(newDV);
updateNumNulls(cs, leftUnmatchedRows, rightUnmatchedRows, newNumRows, pos, jop);
}
stats.setColumnStats(colStats);
Expand Down Expand Up @@ -2718,7 +2754,8 @@ private long computeFinalRowCount(List<Long> rowCountParents, long interimRowCou
return result;
}

private long computeRowCountAssumingInnerJoin(List<Long> rowCountParents, long denom,
@VisibleForTesting
long computeRowCountAssumingInnerJoin(List<Long> rowCountParents, long denom,
CommonJoinOperator<? extends JoinDesc> join) {
double factor = 0.0d;
long result = 1;
Expand All @@ -2734,7 +2771,12 @@ private long computeRowCountAssumingInnerJoin(List<Long> rowCountParents, long d
}
}

denom = denom == 0 ? 1 : denom;
// denom < 0 (unknown) and denom == 0 (verified-zero join key, cardinality formula
// degenerate) both fall back to "no constraint" rather than producing a negative
// factor or div-by-zero
if (denom <= 0) {
denom = 1;
}
factor = (double) max / (double) denom;

for (int i = 0; i < rowCountParents.size(); i++) {
Expand Down Expand Up @@ -2786,6 +2828,9 @@ private long getDenominatorForUnmatchedRows(List<Long> distinctVals) {
if (distinctVals.isEmpty()) {
return 2;
}
if (StatsUtils.containsUnknownNDV(distinctVals)) {
return -1L;
}

// simple join from 2 relations: denom = min(v1, v2)
if (distinctVals.size() <= 2) {
Expand Down Expand Up @@ -2826,6 +2871,9 @@ private long getDenominator(List<Long> distinctVals) {
// denominator is 2.
return 2;
}
if (StatsUtils.containsUnknownNDV(distinctVals)) {
return -1L;
}

// simple join from 2 relations: denom = max(v1, v2)
if (distinctVals.size() <= 2) {
Expand Down
6 changes: 5 additions & 1 deletion ql/src/java/org/apache/hadoop/hive/ql/plan/Statistics.java
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,11 @@ public void addToColumnStats(List<ColStatistics> colStats) {
} else {
existing.setNumNulls(StatsUtils.safeAdd(existing.getNumNulls(), cs.getNumNulls()));
}
existing.setCountDistint(Math.max(existing.getCountDistint(), cs.getCountDistint()));
if (cs.getCountDistint() < 0 || existing.getCountDistint() < 0) {
existing.setCountDistint(-1);
} else {
existing.setCountDistint(Math.max(existing.getCountDistint(), cs.getCountDistint()));
}
}
}
}
Expand Down
Loading
Loading