Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@
import static org.hamcrest.Matchers.hasProperty;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.isA;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.fail;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.fail;

import io.opentelemetry.sdk.testing.junit5.OpenTelemetryExtension;
import io.opentelemetry.sdk.trace.data.SpanData;
import java.io.IOException;
import java.io.UncheckedIOException;
Expand All @@ -44,82 +45,66 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ConnectionRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.MatcherPredicate;
import org.apache.hadoop.hbase.MiniClusterRule;
import org.apache.hadoop.hbase.StartTestingClusterOption;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.ipc.RemoteWithExtrasException;
import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes;
import org.apache.hadoop.hbase.trace.OpenTelemetryClassRule;
import org.apache.hadoop.hbase.trace.OpenTelemetryTestRule;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.util.Pair;
import org.hamcrest.Matcher;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExternalResource;
import org.junit.rules.RuleChain;
import org.junit.rules.TestName;
import org.junit.rules.TestRule;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.provider.Arguments;

public abstract class AbstractTestAsyncTableScan {
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;

protected static final OpenTelemetryClassRule OTEL_CLASS_RULE = OpenTelemetryClassRule.create();
public abstract class AbstractTestAsyncTableScan {

private static Configuration createConfiguration() {
Configuration conf = new Configuration();
// Disable directory sharing to prevent race conditions when tests run in parallel.
// Each test instance gets its own isolated directories to avoid one test's tearDown()
// deleting directories another parallel test is still using.
conf.setBoolean("hbase.test.disable-directory-sharing", true);
return conf;
}
@RegisterExtension
protected static final OpenTelemetryExtension OTEL_EXT = OpenTelemetryExtension.create();

protected static final MiniClusterRule MINI_CLUSTER_RULE =
MiniClusterRule.newBuilder().setConfiguration(createConfiguration())
.setMiniClusterOption(StartTestingClusterOption.builder().numWorkers(3).build()).build();
protected static final HBaseTestingUtil UTIL = new HBaseTestingUtil();

protected static final ConnectionRule CONN_RULE =
ConnectionRule.createAsyncConnectionRule(MINI_CLUSTER_RULE::createAsyncConnection);
protected static AsyncConnection CONN;

private static final class Setup extends ExternalResource {
@Override
protected void before() throws Throwable {
final HBaseTestingUtil testingUtil = MINI_CLUSTER_RULE.getTestingUtility();
final AsyncConnection conn = CONN_RULE.getAsyncConnection();
protected String methodName;

byte[][] splitKeys = new byte[8][];
for (int i = 111; i < 999; i += 111) {
splitKeys[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i));
}
testingUtil.createTable(TABLE_NAME, FAMILY, splitKeys);
testingUtil.waitTableAvailable(TABLE_NAME);
conn.getTable(TABLE_NAME)
.putAll(IntStream.range(0, COUNT)
.mapToObj(i -> new Put(Bytes.toBytes(String.format("%03d", i)))
.addColumn(FAMILY, CQ1, Bytes.toBytes(i)).addColumn(FAMILY, CQ2, Bytes.toBytes(i * i)))
.collect(Collectors.toList()))
.get();
@BeforeAll
public static void setUpBeforeClass() throws Exception {
UTIL.startMiniCluster(3);
byte[][] splitKeys = new byte[8][];
for (int i = 111; i < 999; i += 111) {
splitKeys[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i));
}
UTIL.createTable(TABLE_NAME, FAMILY, splitKeys);
UTIL.waitTableAvailable(TABLE_NAME);
try (Table table = UTIL.getConnection().getTable(TABLE_NAME)) {
table.put(IntStream.range(0, COUNT)
.mapToObj(i -> new Put(Bytes.toBytes(String.format("%03d", i)))
.addColumn(FAMILY, CQ1, Bytes.toBytes(i)).addColumn(FAMILY, CQ2, Bytes.toBytes(i * i)))
.collect(Collectors.toList()));
}
CONN = ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get();
}

@ClassRule
public static final TestRule classRule = RuleChain.outerRule(OTEL_CLASS_RULE)
.around(MINI_CLUSTER_RULE).around(CONN_RULE).around(new Setup());

@Rule
public final OpenTelemetryTestRule otelTestRule = new OpenTelemetryTestRule(OTEL_CLASS_RULE);
@AfterAll
public static void tearDownAfterClass() throws Exception {
Closeables.close(CONN, true);
UTIL.shutdownMiniCluster();
}

@Rule
public final TestName testName = new TestName();
@BeforeEach
public void setUp(TestInfo testInfo) {
methodName = testInfo.getTestMethod().get().getName();
}

protected static TableName TABLE_NAME = TableName.valueOf("async");

Expand Down Expand Up @@ -149,11 +134,11 @@ private static Scan createBatchSmallResultSizeScan() {
}

private static AsyncTable<?> getRawTable() {
return CONN_RULE.getAsyncConnection().getTable(TABLE_NAME);
return CONN.getTable(TABLE_NAME);
}

private static AsyncTable<?> getTable() {
return CONN_RULE.getAsyncConnection().getTable(TABLE_NAME, ForkJoinPool.commonPool());
return CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool());
}

private static List<Pair<String, Supplier<Scan>>> getScanCreator() {
Expand All @@ -164,23 +149,20 @@ private static List<Pair<String, Supplier<Scan>>> getScanCreator() {
AbstractTestAsyncTableScan::createBatchSmallResultSizeScan));
}

protected static List<Object[]> getScanCreatorParams() {
return getScanCreator().stream().map(p -> new Object[] { p.getFirst(), p.getSecond() })
.collect(Collectors.toList());
protected static Stream<Arguments> getScanCreatorParams() {
return getScanCreator().stream().map(p -> Arguments.of(p.getFirst(), p.getSecond()));
}

private static List<Pair<String, Supplier<AsyncTable<?>>>> getTableCreator() {
return Arrays.asList(Pair.newPair("raw", AbstractTestAsyncTableScan::getRawTable),
Pair.newPair("normal", AbstractTestAsyncTableScan::getTable));
}

protected static List<Object[]> getTableAndScanCreatorParams() {
protected static Stream<Arguments> getTableAndScanCreatorParams() {
List<Pair<String, Supplier<AsyncTable<?>>>> tableCreator = getTableCreator();
List<Pair<String, Supplier<Scan>>> scanCreator = getScanCreator();
return tableCreator.stream()
.flatMap(tp -> scanCreator.stream()
.map(sp -> new Object[] { tp.getFirst(), tp.getSecond(), sp.getFirst(), sp.getSecond() }))
.collect(Collectors.toList());
return tableCreator.stream().flatMap(tp -> scanCreator.stream()
.map(sp -> Arguments.of(tp.getFirst(), tp.getSecond(), sp.getFirst(), sp.getSecond())));
}

protected abstract Scan createScan();
Expand Down Expand Up @@ -211,25 +193,22 @@ protected final List<Result> convertFromBatchResult(List<Result> results) {
}

protected static void waitForSpan(final Matcher<SpanData> parentSpanMatcher) {
final Configuration conf = MINI_CLUSTER_RULE.getTestingUtility().getConfiguration();
Waiter.waitFor(conf, TimeUnit.SECONDS.toMillis(5), new MatcherPredicate<>(
"Span for test failed to complete.", OTEL_CLASS_RULE::getSpans, hasItem(parentSpanMatcher)));
UTIL.waitFor(TimeUnit.SECONDS.toMillis(5), new MatcherPredicate<>(
"Span for test failed to complete.", OTEL_EXT::getSpans, hasItem(parentSpanMatcher)));
}

protected static Stream<SpanData> spanStream() {
return OTEL_CLASS_RULE.getSpans().stream().filter(Objects::nonNull);
return OTEL_EXT.getSpans().stream().filter(Objects::nonNull);
}

@Test
@TestTemplate
public void testScanAll() throws Exception {
List<Result> results = doScan(createScan(), -1);
// make sure all scanners are closed at RS side
MINI_CLUSTER_RULE.getTestingUtility().getHBaseCluster().getRegionServerThreads().stream()
UTIL.getHBaseCluster().getRegionServerThreads().stream()
.map(JVMClusterUtil.RegionServerThread::getRegionServer).forEach(
rs -> assertEquals(
"The scanner count of " + rs.getServerName() + " is "
+ rs.getRSRpcServices().getScannersCount(),
0, rs.getRSRpcServices().getScannersCount()));
rs -> assertEquals(0, rs.getRSRpcServices().getScannersCount(), "The scanner count of "
+ rs.getServerName() + " is " + rs.getRSRpcServices().getScannersCount()));
assertEquals(COUNT, results.size());
IntStream.range(0, COUNT).forEach(i -> {
Result result = results.get(i);
Expand All @@ -244,43 +223,41 @@ private void assertResultEquals(Result result, int i) {
assertEquals(i * i, Bytes.toInt(result.getValue(FAMILY, CQ2)));
}

@Test
@TestTemplate
public void testReversedScanAll() throws Exception {
List<Result> results =
TraceUtil.trace(() -> doScan(createScan().setReversed(true), -1), testName.getMethodName());
TraceUtil.trace(() -> doScan(createScan().setReversed(true), -1), methodName);
assertEquals(COUNT, results.size());
IntStream.range(0, COUNT).forEach(i -> assertResultEquals(results.get(i), COUNT - i - 1));
assertTraceContinuity();
}

@Test
@TestTemplate
public void testScanNoStopKey() throws Exception {
int start = 345;
List<Result> results = TraceUtil.trace(
() -> doScan(createScan().withStartRow(Bytes.toBytes(String.format("%03d", start))), -1),
testName.getMethodName());
methodName);
assertEquals(COUNT - start, results.size());
IntStream.range(0, COUNT - start).forEach(i -> assertResultEquals(results.get(i), start + i));
assertTraceContinuity();
}

@Test
@TestTemplate
public void testReverseScanNoStopKey() throws Exception {
int start = 765;
final Scan scan =
createScan().withStartRow(Bytes.toBytes(String.format("%03d", start))).setReversed(true);
List<Result> results = TraceUtil.trace(() -> doScan(scan, -1), testName.getMethodName());
List<Result> results = TraceUtil.trace(() -> doScan(scan, -1), methodName);
assertEquals(start + 1, results.size());
IntStream.range(0, start + 1).forEach(i -> assertResultEquals(results.get(i), start - i));
assertTraceContinuity();
}

@Test
@TestTemplate
public void testScanWrongColumnFamily() {
final Exception e = assertThrows(Exception.class,
() -> TraceUtil.trace(
() -> doScan(createScan().addFamily(Bytes.toBytes("WrongColumnFamily")), -1),
testName.getMethodName()));
final Exception e = assertThrows(Exception.class, () -> TraceUtil.trace(
() -> doScan(createScan().addFamily(Bytes.toBytes("WrongColumnFamily")), -1), methodName));
// hamcrest generic enforcement for `anyOf` is a pain; skip it
// but -- don't we always unwrap ExecutionExceptions -- bug?
if (e instanceof NoSuchColumnFamilyException) {
Expand Down Expand Up @@ -349,7 +326,7 @@ private void testReversedScan(int start, boolean startInclusive, int stop, boole
IntStream.range(0, count).forEach(i -> assertResultEquals(results.get(i), actualStart - i));
}

@Test
@TestTemplate
public void testScanWithStartKeyAndStopKey() throws Exception {
testScan(1, true, 998, false, -1); // from first region to last region
testScan(123, true, 345, true, -1);
Expand All @@ -358,7 +335,7 @@ public void testScanWithStartKeyAndStopKey() throws Exception {
testScan(456, false, 678, false, -1);
}

@Test
@TestTemplate
public void testReversedScanWithStartKeyAndStopKey() throws Exception {
testReversedScan(998, true, 1, false, -1); // from last region to first region
testReversedScan(543, true, 321, true, -1);
Expand All @@ -367,23 +344,23 @@ public void testReversedScanWithStartKeyAndStopKey() throws Exception {
testReversedScan(876, false, 654, false, -1);
}

@Test
@TestTemplate
public void testScanAtRegionBoundary() throws Exception {
testScan(222, true, 333, true, -1);
testScan(333, true, 444, false, -1);
testScan(444, false, 555, true, -1);
testScan(555, false, 666, false, -1);
}

@Test
@TestTemplate
public void testReversedScanAtRegionBoundary() throws Exception {
testReversedScan(333, true, 222, true, -1);
testReversedScan(444, true, 333, false, -1);
testReversedScan(555, false, 444, true, -1);
testReversedScan(666, false, 555, false, -1);
}

@Test
@TestTemplate
public void testScanWithLimit() throws Exception {
testScan(1, true, 998, false, 900); // from first region to last region
testScan(123, true, 234, true, 100);
Expand All @@ -392,7 +369,7 @@ public void testScanWithLimit() throws Exception {
testScan(456, false, 678, false, 100);
}

@Test
@TestTemplate
public void testScanWithLimitGreaterThanActualCount() throws Exception {
testScan(1, true, 998, false, 1000); // from first region to last region
testScan(123, true, 345, true, 200);
Expand All @@ -401,7 +378,7 @@ public void testScanWithLimitGreaterThanActualCount() throws Exception {
testScan(456, false, 678, false, 200);
}

@Test
@TestTemplate
public void testReversedScanWithLimit() throws Exception {
testReversedScan(998, true, 1, false, 900); // from last region to first region
testReversedScan(543, true, 321, true, 100);
Expand All @@ -410,7 +387,7 @@ public void testReversedScanWithLimit() throws Exception {
testReversedScan(876, false, 654, false, 100);
}

@Test
@TestTemplate
public void testReversedScanWithLimitGreaterThanActualCount() throws Exception {
testReversedScan(998, true, 1, false, 1000); // from last region to first region
testReversedScan(543, true, 321, true, 200);
Expand All @@ -419,7 +396,7 @@ public void testReversedScanWithLimitGreaterThanActualCount() throws Exception {
testReversedScan(876, false, 654, false, 200);
}

@Test
@TestTemplate
public void testScanEndingEarly() throws Exception {
testScan(1, true, 998, false, 0, 900); // from first region to last region
testScan(123, true, 234, true, 0, 100);
Expand Down
Loading