From eb9fe61944f0d7ffaf69506df78ac9a7fb5b7a0a Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Wed, 10 Jun 2026 19:36:59 +0800 Subject: [PATCH] Fix overflow edge cases in query utilities (#17875) (cherry picked from commit a725ded24717b54383401e11aa5e3680459a152c) --- .../router/leader/HashLeaderBalancer.java | 2 +- .../router/leader/HashLeaderBalancerTest.java | 65 ++++++ .../AsyncIoTConsensusServiceClient.java | 2 +- .../changing/ChangingValueFilter.java | 20 +- .../sdt/SwingingDoorTrendingFilter.java | 19 +- .../TumblingTimeSamplingProcessor.java | 7 +- .../sender/TwoStageAggregateSender.java | 2 +- .../tsfile/PipeTsFileResourceSegmentLock.java | 10 +- .../aggregation/ExtremeAccumulator.java | 32 ++- .../SlidingWindowAggregatorFactory.java | 36 ++-- .../fill/filter/FixedIntervalFillFilter.java | 10 +- .../operator/window/SessionWindow.java | 27 ++- .../operator/window/SessionWindowManager.java | 2 +- .../SimpleFragmentParallelPlanner.java | 3 +- ...SubscriptionPipeEventBatchSegmentLock.java | 4 +- .../changing/ChangingValueFilterTest.java | 55 +++++ .../sdt/SwingingDoorTrendingFilterTest.java | 56 +++++ .../TumblingTimeSamplingProcessorTest.java | 192 ++++++++++++++++++ .../PipeTsFileResourceSegmentLockTest.java | 71 +++++++ .../aggregation/AccumulatorTest.java | 46 +++++ .../SlidingWindowAggregatorFactoryTest.java | 41 ++++ .../filter/FixedIntervalFillFilterTest.java | 41 ++++ .../operator/window/SessionWindowTest.java | 70 +++++++ ...criptionPipeEventBatchSegmentLockTest.java | 50 +++++ .../ainode/AsyncAINodeServiceClient.java | 2 +- .../AsyncConfigNodeInternalServiceClient.java | 2 +- .../AsyncDataNodeExternalServiceClient.java | 2 +- .../AsyncDataNodeInternalServiceClient.java | 2 +- ...cDataNodeMPPDataExchangeServiceClient.java | 2 +- .../AsyncPipeConsensusServiceClient.java | 2 +- .../AsyncPipeDataTransferServiceClient.java | 2 +- 31 files changed, 822 insertions(+), 55 deletions(-) create mode 100644 iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/HashLeaderBalancerTest.java create mode 100644 iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingValueFilterTest.java create mode 100644 iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingFilterTest.java create mode 100644 iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/processor/downsampling/tumbling/TumblingTimeSamplingProcessorTest.java create mode 100644 iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceSegmentLockTest.java create mode 100644 iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/aggregation/slidingwindow/SlidingWindowAggregatorFactoryTest.java create mode 100644 iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/fill/filter/FixedIntervalFillFilterTest.java create mode 100644 iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/window/SessionWindowTest.java create mode 100644 iotdb-core/datanode/src/test/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatchSegmentLockTest.java diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/HashLeaderBalancer.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/HashLeaderBalancer.java index aff023e82da8a..06ad4a76eb513 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/HashLeaderBalancer.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/HashLeaderBalancer.java @@ -43,7 +43,7 @@ public Map generateOptimalLeaderDistribution( (gid, nodeSet) -> { List nodeList = new ArrayList<>(nodeSet); nodeList.sort(null); - int startNodeIndex = Math.abs(gid.hashCode()) % nodeList.size(); + int startNodeIndex = (int) (Math.abs((long) gid.hashCode()) % nodeList.size()); int finalNodeId = nodeList.get(startNodeIndex); for (int i = 0; i < nodeList.size(); i++) { int currentNodeIndex = (startNodeIndex + i) % nodeList.size(); diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/HashLeaderBalancerTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/HashLeaderBalancerTest.java new file mode 100644 index 0000000000000..f2ccccd857d2f --- /dev/null +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/HashLeaderBalancerTest.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.manager.load.balancer.router.leader; + +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; +import org.apache.iotdb.commons.cluster.NodeStatus; +import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; + +public class HashLeaderBalancerTest { + + private static final HashLeaderBalancer BALANCER = new HashLeaderBalancer(); + private static final int MIN_VALUE_HASH_REGION_ID = 268255235; + + @Test + public void minValueHashCodeTest() { + TConsensusGroupId regionGroupId = + new TConsensusGroupId(TConsensusGroupType.DataRegion, MIN_VALUE_HASH_REGION_ID); + Assert.assertEquals(Integer.MIN_VALUE, regionGroupId.hashCode()); + + Map> regionReplicaSetMap = new TreeMap<>(); + regionReplicaSetMap.put(regionGroupId, new HashSet<>(Arrays.asList(1, 2, 3))); + + Map dataNodeStatisticsMap = new TreeMap<>(); + dataNodeStatisticsMap.put(1, new NodeStatistics(NodeStatus.Running)); + dataNodeStatisticsMap.put(2, new NodeStatistics(NodeStatus.Running)); + dataNodeStatisticsMap.put(3, new NodeStatistics(NodeStatus.Running)); + + Map leaderDistribution = + BALANCER.generateOptimalLeaderDistribution( + new TreeMap<>(), + regionReplicaSetMap, + new TreeMap<>(), + dataNodeStatisticsMap, + new TreeMap<>()); + + Assert.assertEquals(Integer.valueOf(3), leaderDistribution.get(regionGroupId)); + } +} diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/AsyncIoTConsensusServiceClient.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/AsyncIoTConsensusServiceClient.java index cb635c8e6dd71..0d8d6eeede206 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/AsyncIoTConsensusServiceClient.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/AsyncIoTConsensusServiceClient.java @@ -147,7 +147,7 @@ public PooledObject makeObject(TEndPoint endPoin new AsyncIoTConsensusServiceClient( thriftClientProperty, endPoint, - tManagers[clientCnt.incrementAndGet() % tManagers.length], + tManagers[Math.floorMod(clientCnt.incrementAndGet(), tManagers.length)], clientManager)); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingValueFilter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingValueFilter.java index 7dc8c87c09b2d..9bb5dfcb9a761 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingValueFilter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingValueFilter.java @@ -59,13 +59,13 @@ public boolean filter(final long timestamp, final T value) { } private boolean tryFilter(final long timestamp, final T value) { - final long timeDiff = Math.abs(timestamp - lastStoredTimestamp); - - if (timeDiff <= processor.getCompressionMinTimeInterval()) { + if (isTimeDistanceLessThanOrEqual( + timestamp, lastStoredTimestamp, processor.getCompressionMinTimeInterval())) { return false; } - if (timeDiff >= processor.getCompressionMaxTimeInterval()) { + if (isTimeDistanceGreaterThanOrEqual( + timestamp, lastStoredTimestamp, processor.getCompressionMaxTimeInterval())) { reset(timestamp, value); return true; } @@ -94,6 +94,18 @@ private boolean tryFilter(final long timestamp, final T value) { return false; } + private boolean isTimeDistanceLessThanOrEqual( + final long left, final long right, final long maxDistance) { + final long distance = left >= right ? left - right : right - left; + return Long.compareUnsigned(distance, maxDistance) <= 0; + } + + private boolean isTimeDistanceGreaterThanOrEqual( + final long left, final long right, final long minDistance) { + final long distance = left >= right ? left - right : right - left; + return Long.compareUnsigned(distance, minDistance) >= 0; + } + private void reset(final long timestamp, final T value) { lastStoredTimestamp = timestamp; lastStoredValue = value; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingFilter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingFilter.java index 850cbd3ed0729..bfc0bc5d3f791 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingFilter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingFilter.java @@ -86,13 +86,14 @@ public boolean filter(final long timestamp, final T value) { private boolean tryFilter(final long timestamp, final T value) { final long timeDiff = timestamp - lastStoredTimestamp; - final long absTimeDiff = Math.abs(timeDiff); - if (absTimeDiff <= processor.getCompressionMinTimeInterval()) { + if (isTimeDistanceLessThanOrEqual( + timestamp, lastStoredTimestamp, processor.getCompressionMinTimeInterval())) { return false; } - if (absTimeDiff >= processor.getCompressionMaxTimeInterval()) { + if (isTimeDistanceGreaterThanOrEqual( + timestamp, lastStoredTimestamp, processor.getCompressionMaxTimeInterval())) { reset(timestamp, value); return true; } @@ -144,6 +145,18 @@ private boolean tryFilter(final long timestamp, final T value) { return false; } + private boolean isTimeDistanceLessThanOrEqual( + final long left, final long right, final long maxDistance) { + final long distance = left >= right ? left - right : right - left; + return Long.compareUnsigned(distance, maxDistance) <= 0; + } + + private boolean isTimeDistanceGreaterThanOrEqual( + final long left, final long right, final long minDistance) { + final long distance = left >= right ? left - right : right - left; + return Long.compareUnsigned(distance, minDistance) >= 0; + } + private void reset(final long timestamp, final T value) { upperDoor = Double.MIN_VALUE; lowerDoor = Double.MAX_VALUE; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/tumbling/TumblingTimeSamplingProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/tumbling/TumblingTimeSamplingProcessor.java index 665f5781801a7..6d4e2187bb3ed 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/tumbling/TumblingTimeSamplingProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/tumbling/TumblingTimeSamplingProcessor.java @@ -113,7 +113,7 @@ protected void processRow( final Long lastSampleTime = pathLastObjectCache.getPartialPathLastObject(timeSeriesSuffix); if (lastSampleTime == null - || Math.abs(currentRowTime - lastSampleTime) >= intervalInCurrentPrecision) { + || isTimeDistanceGreaterThanOrEqual(currentRowTime, lastSampleTime)) { try { rowCollector.collectRow(row); @@ -132,4 +132,9 @@ protected void processRow( } } } + + private boolean isTimeDistanceGreaterThanOrEqual(long left, long right) { + long distance = left >= right ? left - right : right - left; + return Long.compareUnsigned(distance, intervalInCurrentPrecision) >= 0; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java index 3c36559a300e6..2f7ec3edb6f5b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java @@ -75,7 +75,7 @@ public synchronized TPipeTransferResp request(long watermark, TPipeTransferReq r final boolean endPointsChanged = tryFetchEndPointsIfNecessary(); tryConstructClients(endPointsChanged); - final TEndPoint endPoint = endPoints[(int) watermark % endPoints.length]; + final TEndPoint endPoint = endPoints[(int) Math.floorMod(watermark, endPoints.length)]; IoTDBSyncClient client = endPointIoTDBSyncClientMap.get(endPoint); if (client == null) { client = reconstructIoTDBSyncClient(endPoint); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceSegmentLock.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceSegmentLock.java index cd1be83e55fd6..c8b8321bd3936 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceSegmentLock.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceSegmentLock.java @@ -77,17 +77,21 @@ private void initIfNecessary() { public void lock(final File file) { initIfNecessary(); - locks[Math.abs(file.hashCode()) % locks.length].lock(); + locks[getLockIndex(file)].lock(); } public boolean tryLock(final File file, final long timeout, final TimeUnit timeUnit) throws InterruptedException { initIfNecessary(); - return locks[Math.abs(file.hashCode()) % locks.length].tryLock(timeout, timeUnit); + return locks[getLockIndex(file)].tryLock(timeout, timeUnit); } public void unlock(final File file) { initIfNecessary(); - locks[Math.abs(file.hashCode()) % locks.length].unlock(); + locks[getLockIndex(file)].unlock(); + } + + private int getLockIndex(final File file) { + return (int) (Math.abs((long) file.hashCode()) % locks.length); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/ExtremeAccumulator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/ExtremeAccumulator.java index 76a42b41c7180..16c385e0e9067 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/ExtremeAccumulator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/ExtremeAccumulator.java @@ -262,13 +262,9 @@ private void addIntInput(Column[] column, BitMap bitMap) { } private void updateIntResult(int extVal) { - int absExtVal = Math.abs(extVal); int candidateResult = extremeResult.getInt(); - int absCandidateResult = Math.abs(extremeResult.getInt()); - if (!initResult - || (absExtVal > absCandidateResult) - || (absExtVal == absCandidateResult) && extVal > candidateResult) { + if (!initResult || compareExtreme(extVal, candidateResult) > 0) { initResult = true; extremeResult.setInt(extVal); } @@ -287,13 +283,9 @@ private void addLongInput(Column[] column, BitMap bitMap) { } private void updateLongResult(long extVal) { - long absExtVal = Math.abs(extVal); long candidateResult = extremeResult.getLong(); - long absCandidateResult = Math.abs(extremeResult.getLong()); - if (!initResult - || (absExtVal > absCandidateResult) - || (absExtVal == absCandidateResult) && extVal > candidateResult) { + if (!initResult || compareExtreme(extVal, candidateResult) > 0) { initResult = true; extremeResult.setLong(extVal); } @@ -348,4 +340,24 @@ private void updateDoubleResult(double extVal) { extremeResult.setDouble(extVal); } } + + private int compareExtreme(int left, int right) { + int absComparison = Long.compare(Math.abs((long) left), Math.abs((long) right)); + return absComparison == 0 ? Integer.compare(left, right) : absComparison; + } + + private int compareExtreme(long left, long right) { + int absComparison = compareAbs(left, right); + return absComparison == 0 ? Long.compare(left, right) : absComparison; + } + + private int compareAbs(long left, long right) { + if (left == Long.MIN_VALUE) { + return right == Long.MIN_VALUE ? 0 : 1; + } + if (right == Long.MIN_VALUE) { + return -1; + } + return Long.compare(Math.abs(left), Math.abs(right)); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/slidingwindow/SlidingWindowAggregatorFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/slidingwindow/SlidingWindowAggregatorFactory.java index 572d41d518486..e7dfeb1573316 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/slidingwindow/SlidingWindowAggregatorFactory.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/slidingwindow/SlidingWindowAggregatorFactory.java @@ -76,26 +76,14 @@ private SlidingWindowAggregatorFactory() {} (o1, o2) -> { int value1 = o1.getInt(0); int value2 = o2.getInt(0); - if (Math.abs(value1) > Math.abs(value2) - || (Math.abs(value1) == Math.abs(value2) && value1 > value2)) { - return 1; - } else if (value1 == value2) { - return 0; - } - return -1; + return compareExtreme(value1, value2); }); extremeComparators.put( TSDataType.INT64, (o1, o2) -> { long value1 = o1.getLong(0); long value2 = o2.getLong(0); - if (Math.abs(value1) > Math.abs(value2) - || (Math.abs(value1) == Math.abs(value2) && value1 > value2)) { - return 1; - } else if (value1 == value2) { - return 0; - } - return -1; + return compareExtreme(value1, value2); }); extremeComparators.put( TSDataType.FLOAT, @@ -237,4 +225,24 @@ public static SlidingWindowAggregator createSlidingWindowAggregator( throw new IllegalArgumentException("Invalid Aggregation Type: " + aggregationType); } } + + static int compareExtreme(int left, int right) { + int absComparison = Long.compare(Math.abs((long) left), Math.abs((long) right)); + return absComparison == 0 ? Integer.compare(left, right) : absComparison; + } + + static int compareExtreme(long left, long right) { + int absComparison = compareAbs(left, right); + return absComparison == 0 ? Long.compare(left, right) : absComparison; + } + + private static int compareAbs(long left, long right) { + if (left == Long.MIN_VALUE) { + return right == Long.MIN_VALUE ? 0 : 1; + } + if (right == Long.MIN_VALUE) { + return -1; + } + return Long.compare(Math.abs(left), Math.abs(right)); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/fill/filter/FixedIntervalFillFilter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/fill/filter/FixedIntervalFillFilter.java index 840148438a7d2..4d5a1e577dbbc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/fill/filter/FixedIntervalFillFilter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/fill/filter/FixedIntervalFillFilter.java @@ -34,6 +34,14 @@ public FixedIntervalFillFilter(long timeInterval) { public boolean needFill(long time, long previousTime) { // the reason that we use Math.abs is that we may use order by time desc which will cause // previousTime is larger than time - return Math.abs(time - previousTime) <= timeInterval; + return isTimeDistanceLessThanOrEqual(time, previousTime, timeInterval); + } + + private boolean isTimeDistanceLessThanOrEqual(long left, long right, long maxDistance) { + if (maxDistance < 0) { + return false; + } + long distance = left >= right ? left - right : right - left; + return Long.compareUnsigned(distance, maxDistance) <= 0; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/window/SessionWindow.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/window/SessionWindow.java index 5c28c05b3ec03..8a5e48162b94d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/window/SessionWindow.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/window/SessionWindow.java @@ -55,9 +55,9 @@ public boolean satisfy(Column column, int index) { return true; } if (index == 0) { - return Math.abs(column.getLong(index) - lastTsBlockTime) <= timeInterval; + return isTimeDistanceLessThanOrEqual(column.getLong(index), lastTsBlockTime); } - return Math.abs(column.getLong(index) - column.getLong(index - 1)) <= timeInterval; + return isTimeDistanceLessThanOrEqual(column.getLong(index), column.getLong(index - 1)); } @Override @@ -89,17 +89,18 @@ public void mergeOnePoint(Column[] controlTimeAndValueColumn, int index) { public boolean contains(Column column) { TimeColumn timeColumn = (TimeColumn) column; + long columnStartTime = column.getLong(0); long minTime = Math.min(timeColumn.getStartTime(), timeColumn.getEndTime()); long maxTime = Math.max(timeColumn.getStartTime(), timeColumn.getEndTime()); boolean contains = - Math.abs(column.getLong(0) - lastTsBlockTime) < timeInterval - && maxTime - minTime <= timeInterval; + isTimeDistanceLessThan(columnStartTime, lastTsBlockTime) + && isTimeDistanceLessThanOrEqual(maxTime, minTime); if (contains) { if (!initializedTimeValue) { startTime = Long.MAX_VALUE; endTime = Long.MIN_VALUE; - lastTsBlockTime = column.getLong(0); + lastTsBlockTime = columnStartTime; timeValue = ascending ? maxTime : minTime; initializedTimeValue = true; } @@ -114,6 +115,22 @@ public long getTimeInterval() { return timeInterval; } + boolean isTimeDistanceLessThanOrEqual(long left, long right) { + return compareTimeDistance(left, right) <= 0; + } + + private boolean isTimeDistanceLessThan(long left, long right) { + return compareTimeDistance(left, right) < 0; + } + + private int compareTimeDistance(long left, long right) { + if (timeInterval < 0) { + return 1; + } + long distance = left >= right ? left - right : right - left; + return Long.compareUnsigned(distance, timeInterval); + } + public long getTimeValue() { return timeValue; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/window/SessionWindowManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/window/SessionWindowManager.java index a5504a8b135f9..48defbbdc1afb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/window/SessionWindowManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/window/SessionWindowManager.java @@ -93,7 +93,7 @@ public TsBlock skipPointsOutOfCurWindow(TsBlock inputTsBlock) { for (; i < size; i++) { long currentTime = timeColumn.getLong(i); - if (Math.abs(currentTime - previousTimeValue) > sessionWindow.getTimeInterval()) { + if (!sessionWindow.isTimeDistanceLessThanOrEqual(currentTime, previousTimeValue)) { sessionWindow.setTimeValue(previousTimeValue); break; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java index e0a193b63f1f3..ab89604105148 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java @@ -230,7 +230,8 @@ private TDataNodeLocation selectTargetDataNode(TRegionReplicaSet regionReplicaSe if (!selectRandomDataNode || queryContext.getSession() == null) { targetIndex = 0; } else { - targetIndex = (int) (queryContext.getSession().getSessionId() % availableDataNodes.size()); + targetIndex = + (int) Math.floorMod(queryContext.getSession().getSessionId(), availableDataNodes.size()); } return availableDataNodes.get(targetIndex); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatchSegmentLock.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatchSegmentLock.java index 194eb5a8fa53b..0c91492e4b3ac 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatchSegmentLock.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatchSegmentLock.java @@ -65,11 +65,11 @@ private void initIfNecessary() { public void lock(final int regionId) { initIfNecessary(); - locks[regionId % locks.length].lock(); + locks[Math.floorMod(regionId, locks.length)].lock(); } public void unlock(final int regionId) { initIfNecessary(); - locks[regionId % locks.length].unlock(); + locks[Math.floorMod(regionId, locks.length)].unlock(); } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingValueFilterTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingValueFilterTest.java new file mode 100644 index 0000000000000..2bf4da9957c18 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingValueFilterTest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.processor.downsampling.changing; + +import org.junit.Assert; +import org.junit.Test; + +import java.lang.reflect.Field; + +public class ChangingValueFilterTest { + + @Test + public void testExtremeTimestampDistanceReachesMaxInterval() throws Exception { + final ChangingValueFilter filter = + new ChangingValueFilter<>(createProcessor(0, Long.MAX_VALUE, 0), Long.MIN_VALUE, 0); + + Assert.assertTrue(filter.filter(Long.MAX_VALUE, 0)); + } + + private ChangingValueSamplingProcessor createProcessor( + final long compressionMinTimeInterval, + final long compressionMaxTimeInterval, + final double compressionDeviation) + throws Exception { + final ChangingValueSamplingProcessor processor = new ChangingValueSamplingProcessor(); + setField(processor, "compressionMinTimeInterval", compressionMinTimeInterval); + setField(processor, "compressionMaxTimeInterval", compressionMaxTimeInterval); + setField(processor, "compressionDeviation", compressionDeviation); + return processor; + } + + private void setField(final Object target, final String fieldName, final Object value) + throws Exception { + final Field field = target.getClass().getDeclaredField(fieldName); + field.setAccessible(true); + field.set(target, value); + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingFilterTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingFilterTest.java new file mode 100644 index 0000000000000..cd04df0419498 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingFilterTest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.processor.downsampling.sdt; + +import org.junit.Assert; +import org.junit.Test; + +import java.lang.reflect.Field; + +public class SwingingDoorTrendingFilterTest { + + @Test + public void testExtremeTimestampDistanceReachesMaxInterval() throws Exception { + final SwingingDoorTrendingFilter filter = + new SwingingDoorTrendingFilter<>(createProcessor(0, Long.MAX_VALUE, 0), Long.MIN_VALUE, 0); + + Assert.assertTrue(filter.filter(Long.MAX_VALUE, 0)); + } + + private SwingingDoorTrendingSamplingProcessor createProcessor( + final long compressionMinTimeInterval, + final long compressionMaxTimeInterval, + final double compressionDeviation) + throws Exception { + final SwingingDoorTrendingSamplingProcessor processor = + new SwingingDoorTrendingSamplingProcessor(); + setField(processor, "compressionMinTimeInterval", compressionMinTimeInterval); + setField(processor, "compressionMaxTimeInterval", compressionMaxTimeInterval); + setField(processor, "compressionDeviation", compressionDeviation); + return processor; + } + + private void setField(final Object target, final String fieldName, final Object value) + throws Exception { + final Field field = target.getClass().getDeclaredField(fieldName); + field.setAccessible(true); + field.set(target, value); + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/processor/downsampling/tumbling/TumblingTimeSamplingProcessorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/processor/downsampling/tumbling/TumblingTimeSamplingProcessorTest.java new file mode 100644 index 0000000000000..df18dba434e7d --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/processor/downsampling/tumbling/TumblingTimeSamplingProcessorTest.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.processor.downsampling.tumbling; + +import org.apache.iotdb.db.pipe.processor.downsampling.PartialPathLastObjectCache; +import org.apache.iotdb.pipe.api.access.Row; +import org.apache.iotdb.pipe.api.collector.RowCollector; +import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException; +import org.apache.iotdb.pipe.api.type.Binary; +import org.apache.iotdb.pipe.api.type.Type; + +import org.apache.tsfile.read.common.Path; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.time.LocalDate; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +public class TumblingTimeSamplingProcessorTest { + + @Test + public void testExtremeTimestampDistanceReachesInterval() throws Exception { + final TumblingTimeSamplingProcessor processor = new TumblingTimeSamplingProcessor(); + final TestLastObjectCache cache = new TestLastObjectCache(); + setField(processor, "intervalInCurrentPrecision", Long.MAX_VALUE); + setField(processor, "pathLastObjectCache", cache); + + try { + final CountingRowCollector rowCollector = new CountingRowCollector(); + + processor.processRow( + new TestRow(Long.MIN_VALUE), rowCollector, "root.db.d1", new AtomicReference<>()); + processor.processRow( + new TestRow(Long.MAX_VALUE), rowCollector, "root.db.d1", new AtomicReference<>()); + processor.processRow( + new TestRow(Long.MAX_VALUE - 1), rowCollector, "root.db.d1", new AtomicReference<>()); + + Assert.assertEquals(2, rowCollector.getCollectedRowCount()); + } finally { + cache.close(); + } + } + + private void setField(final Object target, final String fieldName, final Object value) + throws Exception { + final Field field = target.getClass().getDeclaredField(fieldName); + field.setAccessible(true); + field.set(target, value); + } + + private static class TestLastObjectCache extends PartialPathLastObjectCache { + + private TestLastObjectCache() { + super(1024); + } + + @Override + protected long calculateMemoryUsage(final Long object) { + return Long.BYTES; + } + } + + private static class CountingRowCollector implements RowCollector { + + private final AtomicInteger collectedRowCount = new AtomicInteger(); + + @Override + public void collectRow(final Row row) throws IOException { + collectedRowCount.incrementAndGet(); + } + + private int getCollectedRowCount() { + return collectedRowCount.get(); + } + } + + private static class TestRow implements Row { + + private final long timestamp; + + private TestRow(final long timestamp) { + this.timestamp = timestamp; + } + + @Override + public long getTime() { + return timestamp; + } + + @Override + public int getInt(final int columnIndex) { + throw new UnsupportedOperationException(); + } + + @Override + public LocalDate getDate(final int columnIndex) { + throw new UnsupportedOperationException(); + } + + @Override + public long getLong(final int columnIndex) { + throw new UnsupportedOperationException(); + } + + @Override + public float getFloat(final int columnIndex) { + throw new UnsupportedOperationException(); + } + + @Override + public double getDouble(final int columnIndex) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean getBoolean(final int columnIndex) { + throw new UnsupportedOperationException(); + } + + @Override + public Binary getBinary(final int columnIndex) { + throw new UnsupportedOperationException(); + } + + @Override + public String getString(final int columnIndex) { + throw new UnsupportedOperationException(); + } + + @Override + public Object getObject(final int columnIndex) { + return 1; + } + + @Override + public Type getDataType(final int columnIndex) { + return Type.INT32; + } + + @Override + public boolean isNull(final int columnIndex) { + return false; + } + + @Override + public int size() { + return 1; + } + + @Override + public int getColumnIndex(final Path columnName) throws PipeParameterNotValidException { + return 0; + } + + @Override + public String getColumnName(final int columnIndex) { + return "s1"; + } + + @Override + public List getColumnTypes() { + return Collections.singletonList(Type.INT32); + } + + @Override + public String getDeviceId() { + return "root.db.d1"; + } + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceSegmentLockTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceSegmentLockTest.java new file mode 100644 index 0000000000000..e1b53e0dc6ca8 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceSegmentLockTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.resource.tsfile; + +import org.apache.iotdb.commons.conf.CommonDescriptor; + +import org.junit.Assert; +import org.junit.Test; + +import java.io.File; +import java.util.concurrent.TimeUnit; + +public class PipeTsFileResourceSegmentLockTest { + + @Test + public void minValueHashCodeTest() throws InterruptedException { + int originalSegmentLockNum = + CommonDescriptor.getInstance().getConfig().getPipeTsFileResourceSegmentLockNum(); + CommonDescriptor.getInstance().getConfig().setPipeTsFileResourceSegmentLockNum(32); + + try { + PipeTsFileResourceSegmentLock segmentLock = new PipeTsFileResourceSegmentLock(); + File file = new MinValueHashCodeFile("target/min-value-hash-code.tsfile"); + + Assert.assertEquals(Integer.MIN_VALUE, file.hashCode()); + + segmentLock.lock(file); + try { + Assert.assertTrue(segmentLock.tryLock(file, 1, TimeUnit.MILLISECONDS)); + segmentLock.unlock(file); + } finally { + segmentLock.unlock(file); + } + } finally { + CommonDescriptor.getInstance() + .getConfig() + .setPipeTsFileResourceSegmentLockNum(originalSegmentLockNum); + } + } + + private static class MinValueHashCodeFile extends File { + + private static final long serialVersionUID = 1L; + + private MinValueHashCodeFile(String pathname) { + super(pathname); + } + + @Override + public int hashCode() { + return Integer.MIN_VALUE; + } + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/aggregation/AccumulatorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/aggregation/AccumulatorTest.java index 45ea1ddc9cdc5..7f358bd651512 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/aggregation/AccumulatorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/aggregation/AccumulatorTest.java @@ -270,6 +270,52 @@ public void extremeAccumulatorTest() { Assert.assertEquals(100d, finalResult.build().getDouble(0), 0.001); } + @Test + public void extremeAccumulatorMinValueTest() { + Accumulator intAccumulator = + AccumulatorFactory.createBuiltinAccumulator( + TAggregationType.EXTREME, + Collections.singletonList(TSDataType.INT32), + Collections.emptyList(), + Collections.emptyMap(), + true); + TsBlockBuilder intBlockBuilder = + new TsBlockBuilder(Collections.singletonList(TSDataType.INT32)); + intBlockBuilder.getTimeColumnBuilder().writeLong(0); + intBlockBuilder.getValueColumnBuilders()[0].writeInt(Integer.MAX_VALUE); + intBlockBuilder.declarePosition(); + intBlockBuilder.getTimeColumnBuilder().writeLong(1); + intBlockBuilder.getValueColumnBuilders()[0].writeInt(Integer.MIN_VALUE); + intBlockBuilder.declarePosition(); + TsBlock intBlock = intBlockBuilder.build(); + intAccumulator.addInput(new Column[] {intBlock.getTimeColumn(), intBlock.getColumn(0)}, null); + ColumnBuilder intFinalResult = new IntColumnBuilder(null, 1); + intAccumulator.outputFinal(intFinalResult); + Assert.assertEquals(Integer.MIN_VALUE, intFinalResult.build().getInt(0)); + + Accumulator longAccumulator = + AccumulatorFactory.createBuiltinAccumulator( + TAggregationType.EXTREME, + Collections.singletonList(TSDataType.INT64), + Collections.emptyList(), + Collections.emptyMap(), + true); + TsBlockBuilder longBlockBuilder = + new TsBlockBuilder(Collections.singletonList(TSDataType.INT64)); + longBlockBuilder.getTimeColumnBuilder().writeLong(0); + longBlockBuilder.getValueColumnBuilders()[0].writeLong(Long.MAX_VALUE); + longBlockBuilder.declarePosition(); + longBlockBuilder.getTimeColumnBuilder().writeLong(1); + longBlockBuilder.getValueColumnBuilders()[0].writeLong(Long.MIN_VALUE); + longBlockBuilder.declarePosition(); + TsBlock longBlock = longBlockBuilder.build(); + longAccumulator.addInput( + new Column[] {longBlock.getTimeColumn(), longBlock.getColumn(0)}, null); + ColumnBuilder longFinalResult = new LongColumnBuilder(null, 1); + longAccumulator.outputFinal(longFinalResult); + Assert.assertEquals(Long.MIN_VALUE, longFinalResult.build().getLong(0)); + } + @Test public void firstValueAccumulatorTest() { Accumulator firstValueAccumulator = diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/aggregation/slidingwindow/SlidingWindowAggregatorFactoryTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/aggregation/slidingwindow/SlidingWindowAggregatorFactoryTest.java new file mode 100644 index 0000000000000..a029441b4dc80 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/aggregation/slidingwindow/SlidingWindowAggregatorFactoryTest.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.aggregation.slidingwindow; + +import org.junit.Assert; +import org.junit.Test; + +public class SlidingWindowAggregatorFactoryTest { + + @Test + public void compareExtremeMinValueTest() { + Assert.assertTrue( + SlidingWindowAggregatorFactory.compareExtreme(Integer.MIN_VALUE, Integer.MAX_VALUE) > 0); + Assert.assertTrue( + SlidingWindowAggregatorFactory.compareExtreme(Integer.MAX_VALUE, Integer.MIN_VALUE) < 0); + Assert.assertTrue(SlidingWindowAggregatorFactory.compareExtreme(1, -1) > 0); + + Assert.assertTrue( + SlidingWindowAggregatorFactory.compareExtreme(Long.MIN_VALUE, Long.MAX_VALUE) > 0); + Assert.assertTrue( + SlidingWindowAggregatorFactory.compareExtreme(Long.MAX_VALUE, Long.MIN_VALUE) < 0); + Assert.assertTrue(SlidingWindowAggregatorFactory.compareExtreme(1L, -1L) > 0); + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/fill/filter/FixedIntervalFillFilterTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/fill/filter/FixedIntervalFillFilterTest.java new file mode 100644 index 0000000000000..4b9e583d6460b --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/fill/filter/FixedIntervalFillFilterTest.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.process.fill.filter; + +import org.junit.Assert; +import org.junit.Test; + +public class FixedIntervalFillFilterTest { + + @Test + public void needFillHandlesOverflowedTimeDistance() { + FixedIntervalFillFilter filter = new FixedIntervalFillFilter(1); + + Assert.assertTrue(filter.needFill(Long.MIN_VALUE, Long.MIN_VALUE + 1)); + Assert.assertFalse(filter.needFill(Long.MIN_VALUE, Long.MAX_VALUE)); + } + + @Test + public void needFillRejectsNegativeInterval() { + FixedIntervalFillFilter filter = new FixedIntervalFillFilter(-1); + + Assert.assertFalse(filter.needFill(0, 0)); + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/window/SessionWindowTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/window/SessionWindowTest.java new file mode 100644 index 0000000000000..ea534a76ebc82 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/window/SessionWindowTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.window; + +import org.apache.tsfile.block.column.Column; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.read.common.block.TsBlockBuilder; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Collections; + +public class SessionWindowTest { + + @Test + public void satisfyHandlesOverflowedTimeDistance() { + SessionWindow window = new SessionWindow(1, true); + Column timeColumn = buildTimeColumn(Long.MIN_VALUE, Long.MAX_VALUE); + window.mergeOnePoint(new Column[] {timeColumn}, 0); + + Assert.assertFalse(window.satisfy(timeColumn, 1)); + } + + @Test + public void skipPointsOutOfCurWindowHandlesOverflowedTimeDistance() { + SessionWindowManager manager = new SessionWindowManager(false, 1, true); + manager.initCurWindow(); + Column previousTimeColumn = buildTimeColumn(Long.MIN_VALUE); + manager.getCurWindow().mergeOnePoint(new Column[] {previousTimeColumn}, 0); + manager.next(); + + TsBlock nextBlock = buildTsBlock(Long.MAX_VALUE); + TsBlock skippedBlock = manager.skipPointsOutOfCurWindow(nextBlock); + + Assert.assertEquals(1, skippedBlock.getPositionCount()); + Assert.assertEquals(Long.MAX_VALUE, skippedBlock.getTimeColumn().getLong(0)); + } + + private Column buildTimeColumn(long... timestamps) { + return buildTsBlock(timestamps).getTimeColumn(); + } + + private TsBlock buildTsBlock(long... timestamps) { + TsBlockBuilder builder = new TsBlockBuilder(Collections.singletonList(TSDataType.INT32)); + for (long timestamp : timestamps) { + builder.getTimeColumnBuilder().writeLong(timestamp); + builder.getColumnBuilder(0).appendNull(); + builder.declarePosition(); + } + return builder.build(); + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatchSegmentLockTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatchSegmentLockTest.java new file mode 100644 index 0000000000000..7488f5ffa26e4 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatchSegmentLockTest.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.subscription.event.batch; + +import org.junit.Assert; +import org.junit.Test; + +import java.lang.reflect.Field; +import java.util.concurrent.locks.ReentrantLock; + +public class SubscriptionPipeEventBatchSegmentLockTest { + + @Test + public void negativeRegionIdTest() throws Exception { + final SubscriptionPipeEventBatchSegmentLock segmentLock = + new SubscriptionPipeEventBatchSegmentLock(); + + segmentLock.lock(-1); + try { + final ReentrantLock[] locks = getLocks(segmentLock); + Assert.assertTrue(locks[locks.length - 1].isHeldByCurrentThread()); + } finally { + segmentLock.unlock(-1); + } + } + + private ReentrantLock[] getLocks(final SubscriptionPipeEventBatchSegmentLock segmentLock) + throws Exception { + final Field locksField = SubscriptionPipeEventBatchSegmentLock.class.getDeclaredField("locks"); + locksField.setAccessible(true); + return (ReentrantLock[]) locksField.get(segmentLock); + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ainode/AsyncAINodeServiceClient.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ainode/AsyncAINodeServiceClient.java index 3276923deb78e..b6817d0c52848 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ainode/AsyncAINodeServiceClient.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ainode/AsyncAINodeServiceClient.java @@ -130,7 +130,7 @@ public PooledObject makeObject(TEndPoint endPoint) thr new AsyncAINodeServiceClient( thriftClientProperty, endPoint, - tManagers[clientCnt.incrementAndGet() % tManagers.length], + tManagers[Math.floorMod(clientCnt.incrementAndGet(), tManagers.length)], clientManager)); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeInternalServiceClient.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeInternalServiceClient.java index 1b3d6a21a1f1b..33b0f217122eb 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeInternalServiceClient.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeInternalServiceClient.java @@ -177,7 +177,7 @@ public PooledObject makeObject(TEndPoint e new AsyncConfigNodeInternalServiceClient( thriftClientProperty, endPoint, - tManagers[clientCnt.incrementAndGet() % tManagers.length], + tManagers[Math.floorMod(clientCnt.incrementAndGet(), tManagers.length)], clientManager)); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeExternalServiceClient.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeExternalServiceClient.java index 5de58b8eef14d..472739c1e12bc 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeExternalServiceClient.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeExternalServiceClient.java @@ -161,7 +161,7 @@ public PooledObject makeObject(TEndPoint end new AsyncDataNodeExternalServiceClient( thriftClientProperty, endPoint, - tManagers[clientCnt.incrementAndGet() % tManagers.length], + tManagers[Math.floorMod(clientCnt.incrementAndGet(), tManagers.length)], clientManager)); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java index e56aab91c0dc3..dc485690b38f3 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java @@ -189,7 +189,7 @@ public PooledObject makeObject(TEndPoint end new AsyncDataNodeInternalServiceClient( thriftClientProperty, endPoint, - tManagers[clientCnt.incrementAndGet() % tManagers.length], + tManagers[Math.floorMod(clientCnt.incrementAndGet(), tManagers.length)], clientManager)); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeMPPDataExchangeServiceClient.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeMPPDataExchangeServiceClient.java index 6434ec7f017b9..306721d08eda7 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeMPPDataExchangeServiceClient.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeMPPDataExchangeServiceClient.java @@ -149,7 +149,7 @@ public PooledObject makeObject(TEndPo new AsyncDataNodeMPPDataExchangeServiceClient( thriftClientProperty, endPoint, - tManagers[clientCnt.incrementAndGet() % tManagers.length], + tManagers[Math.floorMod(clientCnt.incrementAndGet(), tManagers.length)], clientManager)); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeConsensusServiceClient.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeConsensusServiceClient.java index 6f05350c0891f..5579429e745f2 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeConsensusServiceClient.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeConsensusServiceClient.java @@ -158,7 +158,7 @@ public PooledObject makeObject(TEndPoint endPoi new AsyncPipeConsensusServiceClient( thriftClientProperty, endPoint, - tManagers[clientCnt.incrementAndGet() % tManagers.length], + tManagers[Math.floorMod(clientCnt.incrementAndGet(), tManagers.length)], clientManager)); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java index 2cdfbd865c817..96a819eceeb65 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java @@ -233,7 +233,7 @@ public PooledObject makeObject(final TEndPoi new AsyncPipeDataTransferServiceClient( thriftClientProperty, endPoint, - tManagers[clientCnt.incrementAndGet() % tManagers.length], + tManagers[Math.floorMod(clientCnt.incrementAndGet(), tManagers.length)], clientManager)); }