Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public Map<TConsensusGroupId, Integer> generateOptimalLeaderDistribution(
(gid, nodeSet) -> {
List<Integer> nodeList = new ArrayList<>(nodeSet);
nodeList.sort(null);
int startNodeIndex = Math.abs(gid.hashCode()) % nodeList.size();
int startNodeIndex = (int) (Math.abs((long) gid.hashCode()) % nodeList.size());
int finalNodeId = nodeList.get(startNodeIndex);
for (int i = 0; i < nodeList.size(); i++) {
int currentNodeIndex = (startNodeIndex + i) % nodeList.size();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.confignode.manager.load.balancer.router.leader;

import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics;

import org.junit.Assert;
import org.junit.Test;

import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;

public class HashLeaderBalancerTest {

private static final HashLeaderBalancer BALANCER = new HashLeaderBalancer();
private static final int MIN_VALUE_HASH_REGION_ID = 268255235;

@Test
public void minValueHashCodeTest() {
TConsensusGroupId regionGroupId =
new TConsensusGroupId(TConsensusGroupType.DataRegion, MIN_VALUE_HASH_REGION_ID);
Assert.assertEquals(Integer.MIN_VALUE, regionGroupId.hashCode());

Map<TConsensusGroupId, Set<Integer>> regionReplicaSetMap = new TreeMap<>();
regionReplicaSetMap.put(regionGroupId, new HashSet<>(Arrays.asList(1, 2, 3)));

Map<Integer, NodeStatistics> dataNodeStatisticsMap = new TreeMap<>();
dataNodeStatisticsMap.put(1, new NodeStatistics(NodeStatus.Running));
dataNodeStatisticsMap.put(2, new NodeStatistics(NodeStatus.Running));
dataNodeStatisticsMap.put(3, new NodeStatistics(NodeStatus.Running));

Map<TConsensusGroupId, Integer> leaderDistribution =
BALANCER.generateOptimalLeaderDistribution(
new TreeMap<>(),
regionReplicaSetMap,
new TreeMap<>(),
dataNodeStatisticsMap,
new TreeMap<>());

Assert.assertEquals(Integer.valueOf(3), leaderDistribution.get(regionGroupId));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ public PooledObject<AsyncIoTConsensusServiceClient> makeObject(TEndPoint endPoin
new AsyncIoTConsensusServiceClient(
thriftClientProperty,
endPoint,
tManagers[clientCnt.incrementAndGet() % tManagers.length],
tManagers[Math.floorMod(clientCnt.incrementAndGet(), tManagers.length)],
clientManager));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,13 @@ public boolean filter(final long timestamp, final T value) {
}

private boolean tryFilter(final long timestamp, final T value) {
final long timeDiff = Math.abs(timestamp - lastStoredTimestamp);

if (timeDiff <= processor.getCompressionMinTimeInterval()) {
if (isTimeDistanceLessThanOrEqual(
timestamp, lastStoredTimestamp, processor.getCompressionMinTimeInterval())) {
return false;
}

if (timeDiff >= processor.getCompressionMaxTimeInterval()) {
if (isTimeDistanceGreaterThanOrEqual(
timestamp, lastStoredTimestamp, processor.getCompressionMaxTimeInterval())) {
reset(timestamp, value);
return true;
}
Expand Down Expand Up @@ -94,6 +94,18 @@ private boolean tryFilter(final long timestamp, final T value) {
return false;
}

private boolean isTimeDistanceLessThanOrEqual(
final long left, final long right, final long maxDistance) {
final long distance = left >= right ? left - right : right - left;
return Long.compareUnsigned(distance, maxDistance) <= 0;
}

private boolean isTimeDistanceGreaterThanOrEqual(
final long left, final long right, final long minDistance) {
final long distance = left >= right ? left - right : right - left;
return Long.compareUnsigned(distance, minDistance) >= 0;
}

private void reset(final long timestamp, final T value) {
lastStoredTimestamp = timestamp;
lastStoredValue = value;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,14 @@ public boolean filter(final long timestamp, final T value) {

private boolean tryFilter(final long timestamp, final T value) {
final long timeDiff = timestamp - lastStoredTimestamp;
final long absTimeDiff = Math.abs(timeDiff);

if (absTimeDiff <= processor.getCompressionMinTimeInterval()) {
if (isTimeDistanceLessThanOrEqual(
timestamp, lastStoredTimestamp, processor.getCompressionMinTimeInterval())) {
return false;
}

if (absTimeDiff >= processor.getCompressionMaxTimeInterval()) {
if (isTimeDistanceGreaterThanOrEqual(
timestamp, lastStoredTimestamp, processor.getCompressionMaxTimeInterval())) {
reset(timestamp, value);
return true;
}
Expand Down Expand Up @@ -144,6 +145,18 @@ private boolean tryFilter(final long timestamp, final T value) {
return false;
}

private boolean isTimeDistanceLessThanOrEqual(
final long left, final long right, final long maxDistance) {
final long distance = left >= right ? left - right : right - left;
return Long.compareUnsigned(distance, maxDistance) <= 0;
}

private boolean isTimeDistanceGreaterThanOrEqual(
final long left, final long right, final long minDistance) {
final long distance = left >= right ? left - right : right - left;
return Long.compareUnsigned(distance, minDistance) >= 0;
}

private void reset(final long timestamp, final T value) {
upperDoor = Double.MIN_VALUE;
lowerDoor = Double.MAX_VALUE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ protected void processRow(
final Long lastSampleTime = pathLastObjectCache.getPartialPathLastObject(timeSeriesSuffix);

if (lastSampleTime == null
|| Math.abs(currentRowTime - lastSampleTime) >= intervalInCurrentPrecision) {
|| isTimeDistanceGreaterThanOrEqual(currentRowTime, lastSampleTime)) {
try {
rowCollector.collectRow(row);

Expand All @@ -132,4 +132,9 @@ protected void processRow(
}
}
}

private boolean isTimeDistanceGreaterThanOrEqual(long left, long right) {
long distance = left >= right ? left - right : right - left;
return Long.compareUnsigned(distance, intervalInCurrentPrecision) >= 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public synchronized TPipeTransferResp request(long watermark, TPipeTransferReq r
final boolean endPointsChanged = tryFetchEndPointsIfNecessary();
tryConstructClients(endPointsChanged);

final TEndPoint endPoint = endPoints[(int) watermark % endPoints.length];
final TEndPoint endPoint = endPoints[(int) Math.floorMod(watermark, endPoints.length)];
IoTDBSyncClient client = endPointIoTDBSyncClientMap.get(endPoint);
if (client == null) {
client = reconstructIoTDBSyncClient(endPoint);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,17 +77,21 @@ private void initIfNecessary() {

public void lock(final File file) {
initIfNecessary();
locks[Math.abs(file.hashCode()) % locks.length].lock();
locks[getLockIndex(file)].lock();
}

public boolean tryLock(final File file, final long timeout, final TimeUnit timeUnit)
throws InterruptedException {
initIfNecessary();
return locks[Math.abs(file.hashCode()) % locks.length].tryLock(timeout, timeUnit);
return locks[getLockIndex(file)].tryLock(timeout, timeUnit);
}

public void unlock(final File file) {
initIfNecessary();
locks[Math.abs(file.hashCode()) % locks.length].unlock();
locks[getLockIndex(file)].unlock();
}

private int getLockIndex(final File file) {
return (int) (Math.abs((long) file.hashCode()) % locks.length);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -262,13 +262,9 @@ private void addIntInput(Column[] column, BitMap bitMap) {
}

private void updateIntResult(int extVal) {
int absExtVal = Math.abs(extVal);
int candidateResult = extremeResult.getInt();
int absCandidateResult = Math.abs(extremeResult.getInt());

if (!initResult
|| (absExtVal > absCandidateResult)
|| (absExtVal == absCandidateResult) && extVal > candidateResult) {
if (!initResult || compareExtreme(extVal, candidateResult) > 0) {
initResult = true;
extremeResult.setInt(extVal);
}
Expand All @@ -287,13 +283,9 @@ private void addLongInput(Column[] column, BitMap bitMap) {
}

private void updateLongResult(long extVal) {
long absExtVal = Math.abs(extVal);
long candidateResult = extremeResult.getLong();
long absCandidateResult = Math.abs(extremeResult.getLong());

if (!initResult
|| (absExtVal > absCandidateResult)
|| (absExtVal == absCandidateResult) && extVal > candidateResult) {
if (!initResult || compareExtreme(extVal, candidateResult) > 0) {
initResult = true;
extremeResult.setLong(extVal);
}
Expand Down Expand Up @@ -348,4 +340,24 @@ private void updateDoubleResult(double extVal) {
extremeResult.setDouble(extVal);
}
}

private int compareExtreme(int left, int right) {
int absComparison = Long.compare(Math.abs((long) left), Math.abs((long) right));
return absComparison == 0 ? Integer.compare(left, right) : absComparison;
}

private int compareExtreme(long left, long right) {
int absComparison = compareAbs(left, right);
return absComparison == 0 ? Long.compare(left, right) : absComparison;
}

private int compareAbs(long left, long right) {
if (left == Long.MIN_VALUE) {
return right == Long.MIN_VALUE ? 0 : 1;
}
if (right == Long.MIN_VALUE) {
return -1;
}
return Long.compare(Math.abs(left), Math.abs(right));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,26 +76,14 @@ private SlidingWindowAggregatorFactory() {}
(o1, o2) -> {
int value1 = o1.getInt(0);
int value2 = o2.getInt(0);
if (Math.abs(value1) > Math.abs(value2)
|| (Math.abs(value1) == Math.abs(value2) && value1 > value2)) {
return 1;
} else if (value1 == value2) {
return 0;
}
return -1;
return compareExtreme(value1, value2);
});
extremeComparators.put(
TSDataType.INT64,
(o1, o2) -> {
long value1 = o1.getLong(0);
long value2 = o2.getLong(0);
if (Math.abs(value1) > Math.abs(value2)
|| (Math.abs(value1) == Math.abs(value2) && value1 > value2)) {
return 1;
} else if (value1 == value2) {
return 0;
}
return -1;
return compareExtreme(value1, value2);
});
extremeComparators.put(
TSDataType.FLOAT,
Expand Down Expand Up @@ -237,4 +225,24 @@ public static SlidingWindowAggregator createSlidingWindowAggregator(
throw new IllegalArgumentException("Invalid Aggregation Type: " + aggregationType);
}
}

static int compareExtreme(int left, int right) {
int absComparison = Long.compare(Math.abs((long) left), Math.abs((long) right));
return absComparison == 0 ? Integer.compare(left, right) : absComparison;
}

static int compareExtreme(long left, long right) {
int absComparison = compareAbs(left, right);
return absComparison == 0 ? Long.compare(left, right) : absComparison;
}

private static int compareAbs(long left, long right) {
if (left == Long.MIN_VALUE) {
return right == Long.MIN_VALUE ? 0 : 1;
}
if (right == Long.MIN_VALUE) {
return -1;
}
return Long.compare(Math.abs(left), Math.abs(right));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,14 @@ public FixedIntervalFillFilter(long timeInterval) {
public boolean needFill(long time, long previousTime) {
// the reason that we use Math.abs is that we may use order by time desc which will cause
// previousTime is larger than time
return Math.abs(time - previousTime) <= timeInterval;
return isTimeDistanceLessThanOrEqual(time, previousTime, timeInterval);
}

private boolean isTimeDistanceLessThanOrEqual(long left, long right, long maxDistance) {
if (maxDistance < 0) {
return false;
}
long distance = left >= right ? left - right : right - left;
return Long.compareUnsigned(distance, maxDistance) <= 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ public boolean satisfy(Column column, int index) {
return true;
}
if (index == 0) {
return Math.abs(column.getLong(index) - lastTsBlockTime) <= timeInterval;
return isTimeDistanceLessThanOrEqual(column.getLong(index), lastTsBlockTime);
}
return Math.abs(column.getLong(index) - column.getLong(index - 1)) <= timeInterval;
return isTimeDistanceLessThanOrEqual(column.getLong(index), column.getLong(index - 1));
}

@Override
Expand Down Expand Up @@ -89,17 +89,18 @@ public void mergeOnePoint(Column[] controlTimeAndValueColumn, int index) {
public boolean contains(Column column) {
TimeColumn timeColumn = (TimeColumn) column;

long columnStartTime = column.getLong(0);
long minTime = Math.min(timeColumn.getStartTime(), timeColumn.getEndTime());
long maxTime = Math.max(timeColumn.getStartTime(), timeColumn.getEndTime());

boolean contains =
Math.abs(column.getLong(0) - lastTsBlockTime) < timeInterval
&& maxTime - minTime <= timeInterval;
isTimeDistanceLessThan(columnStartTime, lastTsBlockTime)
&& isTimeDistanceLessThanOrEqual(maxTime, minTime);
if (contains) {
if (!initializedTimeValue) {
startTime = Long.MAX_VALUE;
endTime = Long.MIN_VALUE;
lastTsBlockTime = column.getLong(0);
lastTsBlockTime = columnStartTime;
timeValue = ascending ? maxTime : minTime;
initializedTimeValue = true;
}
Expand All @@ -114,6 +115,22 @@ public long getTimeInterval() {
return timeInterval;
}

boolean isTimeDistanceLessThanOrEqual(long left, long right) {
return compareTimeDistance(left, right) <= 0;
}

private boolean isTimeDistanceLessThan(long left, long right) {
return compareTimeDistance(left, right) < 0;
}

private int compareTimeDistance(long left, long right) {
if (timeInterval < 0) {
return 1;
}
long distance = left >= right ? left - right : right - left;
return Long.compareUnsigned(distance, timeInterval);
}

public long getTimeValue() {
return timeValue;
}
Expand Down
Loading
Loading