diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestCompactionWithDeviceSimulator.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestCompactionWithDeviceSimulator.java new file mode 100644 index 000000000000..5ed4cb92fccf --- /dev/null +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestCompactionWithDeviceSimulator.java @@ -0,0 +1,564 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.io.compress.Compression; +import org.apache.hadoop.hbase.io.devsim.EBSDevice; +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.io.hfile.BlockCompressedSizePredicator; +import org.apache.hadoop.hbase.io.hfile.PreviousBlockCompressionRatePredicator; +import org.apache.hadoop.hbase.io.hfile.UncompressedBlockSizePredicator; +import org.apache.hadoop.hbase.regionserver.BloomType; +import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.StoreEngine; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; +import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory; +import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; +import org.apache.hadoop.hbase.testclassification.IntegrationTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.JVMClusterUtil; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.util.ToolRunner; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.collect.Sets; +import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; + +/** + * Integration test that demonstrates the EBS device layer simulator's value for diagnosing + * compaction throughput under different storage device constraints. Runs HBase major compactions + * under two simulated EBS volume bandwidth limits (constrained and baseline) with realistic HFile + * configuration (Snappy compression, tunable compression ratio, configurable block parameters) and + * produces a diagnostic throughput analysis comparing the scenarios. + *

+ * This test always starts a local MiniDFSCluster with the EBS device layer installed — the + * distributed cluster mode of IntegrationTestBase is not applicable because the device simulator + * requires the {@code ThrottledFsDatasetFactory} to be injected at DataNode startup. + *

+ * JUnit / maven-failsafe execution: + * + *

+ * mvn verify -pl hbase-it -Dtest=IntegrationTestCompactionWithDeviceSimulator
+ * 
+ * + * CLI execution via hbase script: + * + *
+ * hbase org.apache.hadoop.hbase.IntegrationTestCompactionWithDeviceSimulator \
+ *   -totalDataBytes 536870912 -constrainedBwMbps 10 -compression LZ4
+ * 
+ * + * Configurable parameters (settable via CLI flags or + * {@code -Dhbase.IntegrationTestCompactionWithDeviceSimulator.}): + * + *
+ * # Data generation
+ *   totalDataBytes          (default 1073741824 = 1 GB)
+ *   valueSize               (default 102400 = 100 KB)
+ *   numFlushCycles          (default 10)
+ *   targetCompressionRatio  (default 3.0)
+ *
+ * # HFile / column family
+ *   compression             (default SNAPPY)
+ *   dataBlockEncoding       (default NONE)
+ *   blockSize               (default 65536 = 64 KB)
+ *   bloomType               (default ROW)
+ *   blockPredicator         (default PreviousBlockCompressionRatePredicator)
+ *
+ * # Device simulator
+ *   constrainedBwMbps       (default 25)
+ *   baselineBwMbps          (default 250)
+ *   budgetIops              (default 100000)
+ *   deviceLatencyUs         (default 0)
+ *   volumesPerDataNode      (default 1)
+ * 
+ */ +@Tag(IntegrationTests.TAG) +public class IntegrationTestCompactionWithDeviceSimulator extends IntegrationTestBase { + + private static final Logger LOG = + LoggerFactory.getLogger(IntegrationTestCompactionWithDeviceSimulator.class); + + private static final String CLASS_NAME = + IntegrationTestCompactionWithDeviceSimulator.class.getSimpleName(); + private static final String CONF_PREFIX = "hbase." + CLASS_NAME + "."; + + private static final TableName TABLE_NAME = TableName.valueOf(CLASS_NAME); + private static final byte[] FAMILY = Bytes.toBytes("f"); + private static final byte[] QUALIFIER = Bytes.toBytes("q"); + + private static final Map> PREDICATOR_CLASSES = + Map.of("PreviousBlockCompressionRatePredicator", PreviousBlockCompressionRatePredicator.class, + "UncompressedBlockSizePredicator", UncompressedBlockSizePredicator.class); + + // CLI option keys + private static final String OPT_TOTAL_DATA_BYTES = "totalDataBytes"; + private static final String OPT_VALUE_SIZE = "valueSize"; + private static final String OPT_NUM_FLUSH_CYCLES = "numFlushCycles"; + private static final String OPT_TARGET_COMPRESSION_RATIO = "targetCompressionRatio"; + private static final String OPT_COMPRESSION = "compression"; + private static final String OPT_DATA_BLOCK_ENCODING = "dataBlockEncoding"; + private static final String OPT_BLOCK_SIZE = "blockSize"; + private static final String OPT_BLOOM_TYPE = "bloomType"; + private static final String OPT_BLOCK_PREDICATOR = "blockPredicator"; + private static final String OPT_CONSTRAINED_BW_MBPS = "constrainedBwMbps"; + private static final String OPT_BASELINE_BW_MBPS = "baselineBwMbps"; + private static final String OPT_BUDGET_IOPS = "budgetIops"; + private static final String OPT_DEVICE_LATENCY_US = "deviceLatencyUs"; + private static final String OPT_VOLUMES_PER_DN = "volumesPerDataNode"; + + @Override + @BeforeEach + public void setUp() throws Exception { + util = getTestingUtil(getConf()); + } + + @Override + @AfterEach + public void cleanUp() throws Exception { + // Each scenario cleans up its own cluster. + } + + @Override + public void setUpCluster() throws Exception { + // Cluster lifecycle is per-scenario. + } + + @Override + public void setUpMonkey() throws Exception { + // Chaos monkey is not applicable for this test. + } + + @Override + public TableName getTablename() { + return TABLE_NAME; + } + + @Override + protected Set getColumnFamilies() { + return Sets.newHashSet(Bytes.toString(FAMILY)); + } + + @Override + protected void addOptions() { + super.addOptions(); + addOptWithArg(OPT_TOTAL_DATA_BYTES, "Total uncompressed data bytes (default 1073741824)"); + addOptWithArg(OPT_VALUE_SIZE, "Per-cell value size in bytes (default 102400)"); + addOptWithArg(OPT_NUM_FLUSH_CYCLES, "Number of flush cycles / store files (default 10)"); + addOptWithArg(OPT_TARGET_COMPRESSION_RATIO, + "Target compression ratio for generated values (default 3.0)"); + addOptWithArg(OPT_COMPRESSION, + "Compression algorithm: SNAPPY, LZ4, ZSTD, GZ, NONE " + "(default SNAPPY)"); + addOptWithArg(OPT_DATA_BLOCK_ENCODING, + "Data block encoding: NONE, PREFIX, DIFF, FAST_DIFF, ROW_INDEX_V1 (default NONE)"); + addOptWithArg(OPT_BLOCK_SIZE, "HFile data block size in bytes (default 65536)"); + addOptWithArg(OPT_BLOOM_TYPE, "Bloom filter type: NONE, ROW, ROWCOL (default ROW)"); + addOptWithArg(OPT_BLOCK_PREDICATOR, + "Block compressed size predicator class short name (default " + + "PreviousBlockCompressionRatePredicator)"); + addOptWithArg(OPT_CONSTRAINED_BW_MBPS, + "Per-volume BW limit for constrained scenario in MB/s (default 25)"); + addOptWithArg(OPT_BASELINE_BW_MBPS, + "Per-volume BW limit for baseline scenario in MB/s (default 250)"); + addOptWithArg(OPT_BUDGET_IOPS, "Per-volume IOPS budget (default 100000)"); + addOptWithArg(OPT_DEVICE_LATENCY_US, + "Per-IO device latency in microseconds, 0=disabled (default 0)"); + addOptWithArg(OPT_VOLUMES_PER_DN, "Number of storage volumes per DataNode (default 1)"); + } + + @Override + protected void processOptions(CommandLine cmd) { + processBaseOptions(cmd); + Configuration c = getConf(); + setOptToConf(cmd, c, OPT_TOTAL_DATA_BYTES); + setOptToConf(cmd, c, OPT_VALUE_SIZE); + setOptToConf(cmd, c, OPT_NUM_FLUSH_CYCLES); + setOptToConf(cmd, c, OPT_TARGET_COMPRESSION_RATIO); + setOptToConf(cmd, c, OPT_COMPRESSION); + setOptToConf(cmd, c, OPT_DATA_BLOCK_ENCODING); + setOptToConf(cmd, c, OPT_BLOCK_SIZE); + setOptToConf(cmd, c, OPT_BLOOM_TYPE); + setOptToConf(cmd, c, OPT_BLOCK_PREDICATOR); + setOptToConf(cmd, c, OPT_CONSTRAINED_BW_MBPS); + setOptToConf(cmd, c, OPT_BASELINE_BW_MBPS); + setOptToConf(cmd, c, OPT_BUDGET_IOPS); + setOptToConf(cmd, c, OPT_DEVICE_LATENCY_US); + setOptToConf(cmd, c, OPT_VOLUMES_PER_DN); + } + + private static void setOptToConf(CommandLine cmd, Configuration conf, String opt) { + if (cmd.hasOption(opt)) { + conf.set(CONF_PREFIX + opt, cmd.getOptionValue(opt)); + } + } + + @Override + public int runTestFromCommandLine() throws Exception { + doTestCompaction(); + return 0; + } + + @Test + public void testCompactionThroughputVariesWithDeviceBandwidth() throws Exception { + doTestCompaction(); + } + + public static void main(String[] args) throws Exception { + Configuration conf = HBaseConfiguration.create(); + IntegrationTestingUtility.setUseDistributedCluster(conf); + int ret = ToolRunner.run(conf, new IntegrationTestCompactionWithDeviceSimulator(), args); + System.exit(ret); + } + + private void doTestCompaction() throws Exception { + Configuration baseConf = getConf() != null ? getConf() : HBaseConfiguration.create(); + + long totalDataBytes = + baseConf.getLong(CONF_PREFIX + OPT_TOTAL_DATA_BYTES, 1L * 1024 * 1024 * 1024); + int valueSize = baseConf.getInt(CONF_PREFIX + OPT_VALUE_SIZE, 100 * 1024); + int numFlushCycles = baseConf.getInt(CONF_PREFIX + OPT_NUM_FLUSH_CYCLES, 10); + double targetCompressionRatio = + Double.parseDouble(baseConf.get(CONF_PREFIX + OPT_TARGET_COMPRESSION_RATIO, "3.0")); + + String compressionName = baseConf.get(CONF_PREFIX + OPT_COMPRESSION, "SNAPPY"); + String dataBlockEncodingName = baseConf.get(CONF_PREFIX + OPT_DATA_BLOCK_ENCODING, "NONE"); + int blockSize = baseConf.getInt(CONF_PREFIX + OPT_BLOCK_SIZE, 64 * 1024); + String bloomTypeName = baseConf.get(CONF_PREFIX + OPT_BLOOM_TYPE, "ROW"); + String blockPredicatorName = + baseConf.get(CONF_PREFIX + OPT_BLOCK_PREDICATOR, "PreviousBlockCompressionRatePredicator"); + + int constrainedBwMbps = baseConf.getInt(CONF_PREFIX + OPT_CONSTRAINED_BW_MBPS, 25); + int baselineBwMbps = baseConf.getInt(CONF_PREFIX + OPT_BASELINE_BW_MBPS, 250); + int budgetIops = baseConf.getInt(CONF_PREFIX + OPT_BUDGET_IOPS, 100000); + int deviceLatencyUs = baseConf.getInt(CONF_PREFIX + OPT_DEVICE_LATENCY_US, 0); + int volumesPerDataNode = baseConf.getInt(CONF_PREFIX + OPT_VOLUMES_PER_DN, 1); + + Compression.Algorithm compression = Compression.Algorithm.valueOf(compressionName); + DataBlockEncoding dataBlockEncoding = DataBlockEncoding.valueOf(dataBlockEncodingName); + BloomType bloomType = BloomType.valueOf(bloomTypeName); + + Class predicatorClass = + PREDICATOR_CLASSES.get(blockPredicatorName); + if (predicatorClass == null) { + throw new IllegalArgumentException("Unknown blockPredicator '" + blockPredicatorName + + "'. Known values: " + PREDICATOR_CLASSES.keySet()); + } + + LOG.info("========== Test Configuration =========="); + LOG.info(" totalDataBytes={}, valueSize={}, numFlushCycles={}, targetCompressionRatio={}", + totalDataBytes, valueSize, numFlushCycles, targetCompressionRatio); + LOG.info(" compression={}, dataBlockEncoding={}, blockSize={}, bloomType={}", compression, + dataBlockEncoding, blockSize, bloomType); + LOG.info(" blockPredicator={}", predicatorClass.getName()); + LOG.info(" constrainedBwMbps={}, baselineBwMbps={}, budgetIops={}, deviceLatencyUs={}", + constrainedBwMbps, baselineBwMbps, budgetIops, deviceLatencyUs); + LOG.info(" volumesPerDataNode={}", volumesPerDataNode); + LOG.info("========================================"); + + CompactionPerformance constrained = + runCompactionScenario("constrained", constrainedBwMbps, budgetIops, deviceLatencyUs, + volumesPerDataNode, totalDataBytes, valueSize, numFlushCycles, targetCompressionRatio, + compression, dataBlockEncoding, blockSize, bloomType, predicatorClass); + + CompactionPerformance baseline = + runCompactionScenario("baseline", baselineBwMbps, budgetIops, deviceLatencyUs, + volumesPerDataNode, totalDataBytes, valueSize, numFlushCycles, targetCompressionRatio, + compression, dataBlockEncoding, blockSize, bloomType, predicatorClass); + + constrained.log(); + baseline.log(); + double durationRatio = (double) constrained.durationMs / baseline.durationMs; + double bwRatio = (double) baselineBwMbps / constrainedBwMbps; + LOG.info("========== Cross-Scenario Comparison =========="); + LOG.info(" Duration ratio (constrained/baseline): {}", String.format("%.2f", durationRatio)); + LOG.info(" BW ratio (baseline/constrained): {}", String.format("%.2f", bwRatio)); + LOG.info(" Effective throughput: constrained={} MB/s, baseline={} MB/s", + String.format("%.2f", constrained.effectiveThroughputMbps()), + String.format("%.2f", baseline.effectiveThroughputMbps())); + LOG.info("================================================"); + + long constrainedBwSleeps = constrained.bwReadSleepCount + constrained.bwWriteSleepCount; + assertTrue(constrainedBwSleeps > 0, + "Expected BW throttle sleeps in constrained scenario, got 0. " + + "The device simulator did not throttle compaction I/O."); + + assertTrue(constrained.durationMs > baseline.durationMs, + "Expected constrained scenario (" + constrainedBwMbps + " MB/s, " + constrained.durationMs + + " ms) to take longer than baseline (" + baselineBwMbps + " MB/s, " + baseline.durationMs + + " ms)."); + } + + private CompactionPerformance runCompactionScenario(String label, int bwMbps, int budgetIops, + int deviceLatencyUs, int volumesPerDataNode, long totalDataBytes, int valueSize, + int numFlushCycles, double targetCompressionRatio, Compression.Algorithm compression, + DataBlockEncoding dataBlockEncoding, int blockSize, BloomType bloomType, + Class predicatorClass) throws Exception { + + LOG.info(">>> Starting scenario '{}' with device BW = {} MB/s", label, bwMbps); + + Configuration conf = HBaseConfiguration.create(); + conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, + "org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy"); + conf.setBoolean("hbase.tests.use.shortcircuit.reads", false); + conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName()); + conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 100); + conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 200); + conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000); + conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, + NoLimitThroughputController.class.getName()); + conf.set(BlockCompressedSizePredicator.BLOCK_COMPRESSED_SIZE_PREDICATOR, + predicatorClass.getName()); + + EBSDevice.configure(conf, bwMbps, budgetIops, deviceLatencyUs); + + HBaseTestingUtil testUtil = new HBaseTestingUtil(conf); + try { + testUtil.startMiniZKCluster(); + MiniDFSCluster dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(1) + .storagesPerDatanode(volumesPerDataNode).build(); + dfsCluster.waitClusterUp(); + testUtil.setDFSCluster(dfsCluster); + testUtil.startMiniHBaseCluster(1, 1); + + ColumnFamilyDescriptorBuilder cfBuilder = + ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setCompressionType(compression) + .setCompactionCompressionType(compression).setDataBlockEncoding(dataBlockEncoding) + .setBlocksize(blockSize).setBloomFilterType(bloomType); + + Admin admin = testUtil.getAdmin(); + admin.createTable( + TableDescriptorBuilder.newBuilder(TABLE_NAME).setColumnFamily(cfBuilder.build()).build()); + testUtil.waitUntilAllRegionsAssigned(TABLE_NAME); + + long uncompressedDataLoaded = loadData(testUtil, totalDataBytes, valueSize, numFlushCycles, + targetCompressionRatio, admin); + + HStore store = getStore(testUtil); + int storeFilesBefore = store.getStorefilesCount(); + LOG.info(" Scenario '{}': loaded {} MB uncompressed across {} store files", label, + String.format("%.2f", uncompressedDataLoaded / (1024.0 * 1024.0)), storeFilesBefore); + + EBSDevice.resetMetrics(); + + long startTime = EnvironmentEdgeManager.currentTime(); + admin.majorCompact(TABLE_NAME); + waitForCompaction(store); + long durationMs = EnvironmentEdgeManager.currentTime() - startTime; + + CompactionPerformance result = new CompactionPerformance(label, bwMbps, durationMs, + uncompressedDataLoaded, storeFilesBefore); + + LOG.info("<<< Scenario '{}' complete in {} ms", label, durationMs); + return result; + } finally { + EBSDevice.shutdown(); + testUtil.shutdownMiniCluster(); + } + } + + private long loadData(HBaseTestingUtil testUtil, long totalDataBytes, int valueSize, + int numFlushCycles, double targetCompressionRatio, Admin admin) throws IOException { + long bytesPerFlush = totalDataBytes / numFlushCycles; + int rowsPerFlush = Math.max(1, (int) (bytesPerFlush / valueSize)); + long totalUncompressed = 0; + Random rng = new Random(0xCAFEBABE); + int rowCounter = 0; + + try (Table table = testUtil.getConnection().getTable(TABLE_NAME)) { + for (int cycle = 0; cycle < numFlushCycles; cycle++) { + for (int r = 0; r < rowsPerFlush; r++) { + byte[] rowKey = Bytes.toBytes(String.format("row-%010d", rowCounter++)); + byte[] value = generateCompressibleValue(rng, valueSize, targetCompressionRatio); + table.put(new Put(rowKey).addColumn(FAMILY, QUALIFIER, value)); + totalUncompressed += valueSize; + } + admin.flush(TABLE_NAME); + waitForFlush(testUtil); + if ((cycle + 1) % 5 == 0 || cycle == numFlushCycles - 1) { + LOG.info(" Flush cycle {}/{} complete ({} MB loaded so far)", cycle + 1, numFlushCycles, + String.format("%.1f", totalUncompressed / (1024.0 * 1024.0))); + } + } + } + + return totalUncompressed; + } + + /** + * Generate a byte array that compresses at approximately the requested ratio under Snappy. The + * value is split into a random (incompressible) segment and a repeated-byte (highly compressible) + * segment. For a 3:1 target ratio, approximately 1/3 of the bytes are random and 2/3 are a + * constant. + */ + static byte[] generateCompressibleValue(Random rng, int size, double targetCompressionRatio) { + byte[] value = new byte[size]; + int randomBytes = Math.max(1, (int) (size / targetCompressionRatio)); + byte[] randomPart = new byte[randomBytes]; + rng.nextBytes(randomPart); + System.arraycopy(randomPart, 0, value, 0, randomBytes); + byte fill = (byte) 0x42; + for (int i = randomBytes; i < size; i++) { + value[i] = fill; + } + return value; + } + + private HStore getStore(HBaseTestingUtil testUtil) { + SingleProcessHBaseCluster cluster = testUtil.getMiniHBaseCluster(); + List rsts = cluster.getRegionServerThreads(); + for (JVMClusterUtil.RegionServerThread rst : rsts) { + HRegionServer hrs = rst.getRegionServer(); + for (Region region : hrs.getRegions(TABLE_NAME)) { + return ((HRegion) region).getStores().iterator().next(); + } + } + throw new IllegalStateException("No store found for table " + TABLE_NAME); + } + + private void waitForCompaction(HStore store) throws InterruptedException { + long deadline = EnvironmentEdgeManager.currentTime() + 600_000; + while (store.getStorefilesCount() > 1) { + if (EnvironmentEdgeManager.currentTime() > deadline) { + throw new RuntimeException("Compaction did not complete within 10 minutes. Store files: " + + store.getStorefilesCount()); + } + Thread.sleep(500); + } + } + + private void waitForFlush(HBaseTestingUtil testUtil) throws IOException { + long deadline = EnvironmentEdgeManager.currentTime() + 120_000; + while (EnvironmentEdgeManager.currentTime() < deadline) { + long memstoreSize = 0; + for (HRegion region : testUtil.getMiniHBaseCluster().getRegionServer(0) + .getRegions(TABLE_NAME)) { + memstoreSize += region.getMemStoreDataSize(); + } + if (memstoreSize == 0) { + return; + } + try { + Thread.sleep(200); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted waiting for flush", e); + } + } + throw new IOException("Flush did not complete within timeout"); + } + + private static class CompactionPerformance { + final String label; + final int bwMbps; + final long durationMs; + final long bytesRead; + final long bytesWritten; + final long bwReadSleepCount; + final long bwWriteSleepCount; + final long bwReadSleepTimeMs; + final long bwWriteSleepTimeMs; + final long deviceReadOps; + final long deviceWriteOps; + final long appReadOps; + final long appWriteOps; + final long iopsReadSleepCount; + final long iopsWriteSleepCount; + final long uncompressedDataBytes; + final int storeFilesBefore; + + CompactionPerformance(String label, int bwMbps, long durationMs, long uncompressedDataBytes, + int storeFilesBefore) { + this.label = label; + this.bwMbps = bwMbps; + this.durationMs = durationMs; + this.bytesRead = EBSDevice.getTotalBytesRead(); + this.bytesWritten = EBSDevice.getTotalBytesWritten(); + this.bwReadSleepCount = EBSDevice.getReadSleepCount(); + this.bwWriteSleepCount = EBSDevice.getWriteSleepCount(); + this.bwReadSleepTimeMs = EBSDevice.getReadSleepTimeMs(); + this.bwWriteSleepTimeMs = EBSDevice.getWriteSleepTimeMs(); + this.deviceReadOps = EBSDevice.getDeviceReadOps(); + this.deviceWriteOps = EBSDevice.getDeviceWriteOps(); + this.appReadOps = EBSDevice.getReadOpCount(); + this.appWriteOps = EBSDevice.getWriteOpCount(); + this.iopsReadSleepCount = EBSDevice.getIopsReadSleepCount(); + this.iopsWriteSleepCount = EBSDevice.getIopsWriteSleepCount(); + this.uncompressedDataBytes = uncompressedDataBytes; + this.storeFilesBefore = storeFilesBefore; + } + + double effectiveThroughputMbps() { + if (durationMs == 0) { + return Double.MAX_VALUE; + } + return (bytesRead + bytesWritten) / (1024.0 * 1024.0) / (durationMs / 1000.0); + } + + double observedCompressionRatio() { + if (bytesWritten == 0) { + return 1.0; + } + return (double) uncompressedDataBytes / bytesWritten; + } + + void log() { + double throughputMbps = effectiveThroughputMbps(); + LOG.info("=== Scenario: {} (device BW = {} MB/s) ===", label, bwMbps); + LOG.info(" Store files before compaction: {}", storeFilesBefore); + LOG.info(" Compaction duration: {} ms", durationMs); + LOG.info(" Device bytes read: {} ({} MB)", bytesRead, + String.format("%.2f", bytesRead / (1024.0 * 1024.0))); + LOG.info(" Device bytes written: {} ({} MB)", bytesWritten, + String.format("%.2f", bytesWritten / (1024.0 * 1024.0))); + LOG.info(" Effective device throughput: {} MB/s", String.format("%.2f", throughputMbps)); + LOG.info(" Observed compression ratio: {}:1 (uncompressed={} MB, on-disk-written={} MB)", + String.format("%.2f", observedCompressionRatio()), + String.format("%.2f", uncompressedDataBytes / (1024.0 * 1024.0)), + String.format("%.2f", bytesWritten / (1024.0 * 1024.0))); + LOG.info(" BW sleeps: read={}({} ms) write={}({} ms)", bwReadSleepCount, bwReadSleepTimeMs, + bwWriteSleepCount, bwWriteSleepTimeMs); + LOG.info(" App ops: read={} write={} | Device ops: read={} write={}", appReadOps, + appWriteOps, deviceReadOps, deviceWriteOps); + LOG.info(" IOPS sleeps: read={} write={}", iopsReadSleepCount, iopsWriteSleepCount); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/devsim/EBSDevice.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/devsim/EBSDevice.java new file mode 100644 index 000000000000..a713e24fbdc3 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/devsim/EBSDevice.java @@ -0,0 +1,613 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.devsim; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.hadoop.conf.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Static registry and metrics aggregation for the EBS device layer. Each {@link ThrottledFsDataset} + * proxy (one per DataNode) registers itself here at creation time. Provides: + *
    + *
  • Per-DataNode and per-volume metric accessors + *
  • Aggregate accessors across all DataNodes (for simple single-DN tests) + *
  • Periodic metrics reporter (same style as the original ThrottledFileSystem reporter) + *
  • Configuration key constants and a {@link #configure} helper for test setup + *
+ *

+ * Test isolation: This class uses static mutable state. Tests must call {@link #shutdown()} + * in their {@code @AfterClass} method. Parallel test execution in the same JVM is not supported. + */ +public final class EBSDevice { + + private static final Logger LOG = LoggerFactory.getLogger(EBSDevice.class); + + // ---- Configuration keys ---- + + public static final String IO_BUDGET_BYTES_PER_SEC_KEY = "hbase.test.devsim.budget.bytes.per.sec"; + public static final String IO_BUDGET_IOPS_KEY = "hbase.test.devsim.budget.iops"; + public static final String IO_BUDGET_WINDOW_MS_KEY = "hbase.test.devsim.budget.window.ms"; + public static final int DEFAULT_WINDOW_MS = 100; + + public static final String IO_BUDGET_REPORT_INTERVAL_SEC_KEY = + "hbase.test.devsim.budget.report.interval.sec"; + public static final int DEFAULT_REPORT_INTERVAL_SEC = 10; + + public static final String IO_MAX_IO_SIZE_KB_KEY = "hbase.test.devsim.max.iosize.kb"; + public static final int DEFAULT_MAX_IO_SIZE_KB = 1024; + + public static final String IO_INSTANCE_MBPS_KEY = "hbase.test.devsim.instance.mbps"; + public static final int DEFAULT_INSTANCE_MBPS = 0; + + public static final String IO_DEVICE_LATENCY_US_KEY = "hbase.test.devsim.device.latency.us"; + public static final int DEFAULT_DEVICE_LATENCY_US = 1000; + + // ---- Per-DataNode context ---- + + /** + * Holds the EBS volume devices and instance-level budget for a single DataNode. One instance per + * {@link ThrottledFsDataset} proxy. + */ + public static final class DataNodeContext { + private final String datanodeId; + private final EBSVolumeDevice[] volumes; + private final IOBudget instanceBwBudget; + private final long budgetBytesPerSec; + private final int budgetIops; + private final int deviceLatencyUs; + + DataNodeContext(String datanodeId, EBSVolumeDevice[] volumes, IOBudget instanceBwBudget, + long budgetBytesPerSec, int budgetIops, int deviceLatencyUs) { + this.datanodeId = datanodeId; + this.volumes = volumes; + this.instanceBwBudget = instanceBwBudget; + this.budgetBytesPerSec = budgetBytesPerSec; + this.budgetIops = budgetIops; + this.deviceLatencyUs = deviceLatencyUs; + } + + public String getDatanodeId() { + return datanodeId; + } + + public EBSVolumeDevice[] getVolumes() { + return volumes; + } + + public int getNumVolumes() { + return volumes.length; + } + + public IOBudget getInstanceBwBudget() { + return instanceBwBudget; + } + + public long getBudgetBytesPerSec() { + return budgetBytesPerSec; + } + + public int getBudgetIops() { + return budgetIops; + } + + public int getDeviceLatencyUs() { + return deviceLatencyUs; + } + + /** + * Charge bytes against the instance-level BW budget (shared across all volumes on this DN). + * @return ms slept + */ + public long consumeInstanceBw(long bytes) { + if (instanceBwBudget != null) { + return instanceBwBudget.consume(bytes); + } + return 0; + } + + public void reset() { + for (EBSVolumeDevice vol : volumes) { + vol.reset(); + } + if (instanceBwBudget != null) { + instanceBwBudget.reset(); + } + } + } + + // ---- Static registry ---- + + private static final List DATANODES = new CopyOnWriteArrayList<>(); + + private static volatile ScheduledExecutorService metricsExecutor; + private static volatile ScheduledFuture metricsFuture; + private static volatile MetricsReporter metricsReporter; + private static final AtomicLong readInterceptCount = new AtomicLong(); + private static final AtomicLong writeInterceptCount = new AtomicLong(); + private static final AtomicLong unresolvedVolumeCount = new AtomicLong(); + + private EBSDevice() { + } + + /** + * Register a DataNode's EBS context. Called by {@link ThrottledFsDataset} at proxy creation. + */ + public static DataNodeContext register(String datanodeId, EBSVolumeDevice[] volumes, + IOBudget instanceBwBudget, long budgetBytesPerSec, int budgetIops, int deviceLatencyUs) { + DataNodeContext ctx = new DataNodeContext(datanodeId, volumes, instanceBwBudget, + budgetBytesPerSec, budgetIops, deviceLatencyUs); + DATANODES.add(ctx); + LOG.info( + "EBSDevice: registered DN {} with {} volumes " + + "(BW={} MB/s, IOPS={}, latency={}us per volume)", + datanodeId, volumes.length, + budgetBytesPerSec > 0 + ? String.format("%.1f", budgetBytesPerSec / (1024.0 * 1024.0)) + : "unlimited", + budgetIops > 0 ? budgetIops : "unlimited", deviceLatencyUs > 0 ? deviceLatencyUs : "off"); + return ctx; + } + + public static List getDataNodes() { + return DATANODES; + } + + public static DataNodeContext getDataNodeContext(int index) { + return DATANODES.get(index); + } + + public static int getNumDataNodes() { + return DATANODES.size(); + } + + // ---- Aggregate accessors (sum across all DNs and volumes) ---- + + private static long sumVolumes(java.util.function.ToLongFunction fn) { + long total = 0; + for (DataNodeContext dn : DATANODES) { + for (EBSVolumeDevice vol : dn.volumes) { + total += fn.applyAsLong(vol); + } + } + return total; + } + + public static long getTotalBytesRead() { + return sumVolumes(v -> v.totalBytesRead.get()); + } + + public static long getTotalBytesWritten() { + return sumVolumes(v -> v.totalBytesWritten.get()); + } + + public static long getReadOpCount() { + return sumVolumes(v -> v.readOpCount.get()); + } + + public static long getWriteOpCount() { + return sumVolumes(v -> v.writeOpCount.get()); + } + + public static long getDeviceReadOps() { + return sumVolumes(v -> v.volumeDeviceReadOps.get()); + } + + public static long getDeviceWriteOps() { + return sumVolumes(v -> v.volumeDeviceWriteOps.get()); + } + + public static long getReadSleepTimeMs() { + return sumVolumes(v -> v.bwReadSleepTimeMs.get()); + } + + public static long getWriteSleepTimeMs() { + return sumVolumes(v -> v.bwWriteSleepTimeMs.get()); + } + + public static long getReadSleepCount() { + return sumVolumes(v -> v.bwReadSleepCount.get()); + } + + public static long getWriteSleepCount() { + return sumVolumes(v -> v.bwWriteSleepCount.get()); + } + + public static long getIopsReadSleepTimeMs() { + return sumVolumes(v -> v.iopsReadSleepTimeMs.get()); + } + + public static long getIopsWriteSleepTimeMs() { + return sumVolumes(v -> v.iopsWriteSleepTimeMs.get()); + } + + public static long getIopsReadSleepCount() { + return sumVolumes(v -> v.iopsReadSleepCount.get()); + } + + public static long getIopsWriteSleepCount() { + return sumVolumes(v -> v.iopsWriteSleepCount.get()); + } + + public static long getLatencySleepCount() { + return sumVolumes(v -> v.latencySleepCount.get()); + } + + public static long getLatencySleepTimeUs() { + return sumVolumes(v -> v.latencySleepTimeUs.get()); + } + + public static int getDeviceLatencyUs() { + if (!DATANODES.isEmpty()) { + return DATANODES.get(0).deviceLatencyUs; + } + return 0; + } + + public static int getNumVolumes() { + if (!DATANODES.isEmpty()) { + return DATANODES.get(0).volumes.length; + } + return 0; + } + + public static long getBudgetBytesPerSec() { + if (!DATANODES.isEmpty()) { + return DATANODES.get(0).budgetBytesPerSec; + } + return 0; + } + + public static int getBudgetIops() { + if (!DATANODES.isEmpty()) { + return DATANODES.get(0).budgetIops; + } + return 0; + } + + public static String getPerVolumeStats() { + if (DATANODES.isEmpty()) { + return "N/A (no DataNodes registered)"; + } + StringBuilder sb = new StringBuilder(); + for (int d = 0; d < DATANODES.size(); d++) { + DataNodeContext dn = DATANODES.get(d); + if (DATANODES.size() > 1) { + if (d > 0) sb.append("; "); + sb.append("DN").append(d).append("["); + } + for (int i = 0; i < dn.volumes.length; i++) { + if (i > 0) sb.append(", "); + sb.append(String.format("v%d: R=%d W=%d", i, dn.volumes[i].volumeDeviceReadOps.get(), + dn.volumes[i].volumeDeviceWriteOps.get())); + } + if (DATANODES.size() > 1) { + sb.append("]"); + } + } + return sb.toString(); + } + + public static void recordReadIntercept() { + readInterceptCount.incrementAndGet(); + } + + public static void recordWriteIntercept() { + writeInterceptCount.incrementAndGet(); + } + + public static void recordUnresolvedVolume() { + unresolvedVolumeCount.incrementAndGet(); + } + + public static long getReadInterceptCount() { + return readInterceptCount.get(); + } + + public static long getWriteInterceptCount() { + return writeInterceptCount.get(); + } + + public static long getUnresolvedVolumeCount() { + return unresolvedVolumeCount.get(); + } + + // ---- Lifecycle ---- + + public static void resetMetrics() { + MetricsReporter reporter = metricsReporter; + if (reporter != null) { + synchronized (reporter) { + for (DataNodeContext dn : DATANODES) { + dn.reset(); + } + reporter.resetBaseline(); + } + } else { + for (DataNodeContext dn : DATANODES) { + dn.reset(); + } + } + readInterceptCount.set(0); + writeInterceptCount.set(0); + unresolvedVolumeCount.set(0); + } + + public static synchronized void startMetricsReporter(int intervalSec) { + if (metricsExecutor != null) { + return; + } + metricsExecutor = Executors.newSingleThreadScheduledExecutor(r -> { + Thread t = new Thread(r, "EBSDevice-MetricsReporter"); + t.setDaemon(true); + return t; + }); + MetricsReporter reporter = new MetricsReporter(intervalSec); + metricsReporter = reporter; + metricsFuture = + metricsExecutor.scheduleAtFixedRate(reporter, intervalSec, intervalSec, TimeUnit.SECONDS); + } + + public static synchronized void stopMetricsReporter() { + ScheduledFuture f = metricsFuture; + if (f != null) { + f.cancel(false); + metricsFuture = null; + } + ScheduledExecutorService exec = metricsExecutor; + if (exec != null) { + exec.shutdownNow(); + metricsExecutor = null; + } + metricsReporter = null; + } + + /** + * Unregister all DataNodes and stop the metrics reporter. Call from test teardown. + */ + public static void shutdown() { + stopMetricsReporter(); + DATANODES.clear(); + readInterceptCount.set(0); + writeInterceptCount.set(0); + unresolvedVolumeCount.set(0); + } + + // ---- Configuration helper ---- + + /** + * Configure a Hadoop {@link Configuration} for the EBS device layer. Sets the + * {@code dfs.datanode.fsdataset.factory} key and all EBS volume properties. + * @param conf Hadoop configuration to modify + * @param budgetMbps per-volume BW cap in MB/s (0 = unlimited) + * @param budgetIops per-volume IOPS cap (0 = unlimited) + * @param deviceLatencyUs per-IO device latency in microseconds (0 = disabled) + */ + public static void configure(Configuration conf, int budgetMbps, int budgetIops, + int deviceLatencyUs) { + conf.set("dfs.datanode.fsdataset.factory", ThrottledFsDatasetFactory.class.getName()); + if (budgetMbps > 0) { + conf.setLong(IO_BUDGET_BYTES_PER_SEC_KEY, (long) budgetMbps * 1024L * 1024L); + } + if (budgetIops > 0) { + conf.setInt(IO_BUDGET_IOPS_KEY, budgetIops); + } + if (deviceLatencyUs > 0) { + conf.setInt(IO_DEVICE_LATENCY_US_KEY, deviceLatencyUs); + } + } + + /** + * Overload with all knobs. + */ + public static void configure(Configuration conf, int budgetMbps, int budgetIops, + int deviceLatencyUs, int maxIoSizeKb, int instanceMbps) { + configure(conf, budgetMbps, budgetIops, deviceLatencyUs); + conf.setInt(IO_MAX_IO_SIZE_KB_KEY, maxIoSizeKb); + if (instanceMbps > 0) { + conf.setInt(IO_INSTANCE_MBPS_KEY, instanceMbps); + } + } + + // ---- Periodic reporter ---- + + private static class MetricsReporter implements Runnable { + private final int intervalSec; + private long prevBytesRead; + private long prevBytesWritten; + private long prevBwReadSleepMs; + private long prevBwWriteSleepMs; + private long prevBwReadSleepCount; + private long prevBwWriteSleepCount; + private long prevReadOps; + private long prevWriteOps; + private long prevIopsReadSleeps; + private long prevIopsWriteSleeps; + private long prevDeviceReadOps; + private long prevDeviceWriteOps; + private final Map prevPerVolDeviceOps = new HashMap<>(); + + MetricsReporter(int intervalSec) { + this.intervalSec = intervalSec; + } + + synchronized void resetBaseline() { + prevBytesRead = 0; + prevBytesWritten = 0; + prevBwReadSleepMs = 0; + prevBwWriteSleepMs = 0; + prevBwReadSleepCount = 0; + prevBwWriteSleepCount = 0; + prevReadOps = 0; + prevWriteOps = 0; + prevIopsReadSleeps = 0; + prevIopsWriteSleeps = 0; + prevDeviceReadOps = 0; + prevDeviceWriteOps = 0; + prevPerVolDeviceOps.clear(); + } + + @Override + public synchronized void run() { + long curBytesRead = getTotalBytesRead(); + long curBytesWritten = getTotalBytesWritten(); + long curBwReadSleepMs = getReadSleepTimeMs(); + long curBwWriteSleepMs = getWriteSleepTimeMs(); + long curBwReadSleepCount = getReadSleepCount(); + long curBwWriteSleepCount = getWriteSleepCount(); + long curReadOps = getReadOpCount(); + long curWriteOps = getWriteOpCount(); + long curIopsReadSleeps = getIopsReadSleepCount(); + long curIopsWriteSleeps = getIopsWriteSleepCount(); + long curDeviceReadOps = getDeviceReadOps(); + long curDeviceWriteOps = getDeviceWriteOps(); + + long dBytesRead = curBytesRead - prevBytesRead; + long dBytesWritten = curBytesWritten - prevBytesWritten; + long dBwReadSleepMs = curBwReadSleepMs - prevBwReadSleepMs; + long dBwWriteSleepMs = curBwWriteSleepMs - prevBwWriteSleepMs; + long dBwReadSleeps = curBwReadSleepCount - prevBwReadSleepCount; + long dBwWriteSleeps = curBwWriteSleepCount - prevBwWriteSleepCount; + long dReadOps = curReadOps - prevReadOps; + long dWriteOps = curWriteOps - prevWriteOps; + long dIopsReadSleeps = curIopsReadSleeps - prevIopsReadSleeps; + long dIopsWriteSleeps = curIopsWriteSleeps - prevIopsWriteSleeps; + long dDeviceReadOps = curDeviceReadOps - prevDeviceReadOps; + long dDeviceWriteOps = curDeviceWriteOps - prevDeviceWriteOps; + + double readMbps = dBytesRead / (1024.0 * 1024.0) / intervalSec; + double writeMbps = dBytesWritten / (1024.0 * 1024.0) / intervalSec; + double combinedMbps = readMbps + writeMbps; + double dRIops = (double) dDeviceReadOps / intervalSec; + double dWIops = (double) dDeviceWriteOps / intervalSec; + double combinedDeviceIops = dRIops + dWIops; + + int totalVolumes = 0; + long budgetBytesPerSec = 0; + int budgetIops = 0; + int deviceLatencyUs = 0; + if (!DATANODES.isEmpty()) { + DataNodeContext first = DATANODES.get(0); + budgetBytesPerSec = first.budgetBytesPerSec; + budgetIops = first.budgetIops; + deviceLatencyUs = first.deviceLatencyUs; + for (DataNodeContext dn : DATANODES) { + totalVolumes += dn.volumes.length; + } + } + + StringBuilder sb = new StringBuilder(); + sb.append(String.format("EBSDevice [%ds interval, %d DNs, %d vols]: ", intervalSec, + DATANODES.size(), totalVolumes)); + + if (totalVolumes > 0) { + double perVolBwMbps = budgetBytesPerSec > 0 ? budgetBytesPerSec / (1024.0 * 1024.0) : 0; + double aggBwMbps = perVolBwMbps * totalVolumes; + double bwUtil = aggBwMbps > 0 ? (combinedMbps / aggBwMbps) * 100.0 : 0; + int aggIops = totalVolumes * budgetIops; + double iopsUtil = aggIops > 0 ? (combinedDeviceIops / aggIops) * 100.0 : 0; + + if (budgetBytesPerSec > 0) { + sb.append(String.format( + "BW: %dx%.0f=%.0f MB/s, R=%.1f W=%.1f combined=%.1f (%.0f%% util), " + + "BW-sleeps: R=%d(%dms) W=%d(%dms); ", + totalVolumes, perVolBwMbps, aggBwMbps, readMbps, writeMbps, combinedMbps, bwUtil, + dBwReadSleeps, dBwReadSleepMs, dBwWriteSleeps, dBwWriteSleepMs)); + } + if (budgetIops > 0) { + sb.append(String.format( + "IOPS: %dx%d=%d, app-R=%.0f app-W=%.0f dev-R=%.0f dev-W=%.0f " + + "combined-dev=%.0f (%.0f%% util), IOPS-sleeps: R=%d W=%d; ", + totalVolumes, budgetIops, aggIops, (double) dReadOps / intervalSec, + (double) dWriteOps / intervalSec, dRIops, dWIops, combinedDeviceIops, iopsUtil, + dIopsReadSleeps, dIopsWriteSleeps)); + } + + if (budgetIops > 0) { + sb.append("per-vol:["); + boolean first2 = true; + for (int d = 0; d < DATANODES.size(); d++) { + DataNodeContext dn = DATANODES.get(d); + for (int i = 0; i < dn.volumes.length; i++) { + if (!first2) sb.append(' '); + first2 = false; + long volR = dn.volumes[i].volumeDeviceReadOps.get(); + long volW = dn.volumes[i].volumeDeviceWriteOps.get(); + long curVolOps = volR + volW; + String volKey = "d" + d + "v" + i; + long prevVolOps = prevPerVolDeviceOps.getOrDefault(volKey, 0L); + long dVolOps = curVolOps - prevVolOps; + prevPerVolDeviceOps.put(volKey, curVolOps); + double volIops = (double) dVolOps / intervalSec; + double volUtil = budgetIops > 0 ? (volIops / budgetIops) * 100.0 : 0; + if (DATANODES.size() > 1) { + sb.append(String.format("d%dv%d=%.0f%%", d, i, volUtil)); + } else { + sb.append(String.format("v%d=%.0f%%", i, volUtil)); + } + } + } + sb.append("]; "); + } + } + + if (deviceLatencyUs > 0) { + long curLatencySleeps = getLatencySleepCount(); + long curLatencyUs = getLatencySleepTimeUs(); + sb.append( + String.format("lat-sleeps=%d(%.1fs); ", curLatencySleeps, curLatencyUs / 1_000_000.0)); + } + sb.append(String.format("cumulative: R=%s W=%s, R-ops=%d W-ops=%d, dev-R=%d dev-W=%d", + formatBytes(curBytesRead), formatBytes(curBytesWritten), curReadOps, curWriteOps, + curDeviceReadOps, curDeviceWriteOps)); + LOG.info(sb.toString()); + + prevBytesRead = curBytesRead; + prevBytesWritten = curBytesWritten; + prevBwReadSleepMs = curBwReadSleepMs; + prevBwWriteSleepMs = curBwWriteSleepMs; + prevBwReadSleepCount = curBwReadSleepCount; + prevBwWriteSleepCount = curBwWriteSleepCount; + prevReadOps = curReadOps; + prevWriteOps = curWriteOps; + prevIopsReadSleeps = curIopsReadSleeps; + prevIopsWriteSleeps = curIopsWriteSleeps; + prevDeviceReadOps = curDeviceReadOps; + prevDeviceWriteOps = curDeviceWriteOps; + } + } + + static String formatBytes(long bytes) { + if (bytes >= 1024L * 1024L * 1024L) { + return String.format("%.2f GB", bytes / (1024.0 * 1024.0 * 1024.0)); + } else if (bytes >= 1024L * 1024L) { + return String.format("%.2f MB", bytes / (1024.0 * 1024.0)); + } else { + return String.format("%d bytes", bytes); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/devsim/EBSVolumeDevice.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/devsim/EBSVolumeDevice.java new file mode 100644 index 000000000000..9bf4848db31c --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/devsim/EBSVolumeDevice.java @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.devsim; + +import java.util.concurrent.atomic.AtomicLong; + +/** + * Models a single physical EBS volume with independent BW and IOPS budgets and device-level + * coalescing. The synchronized methods naturally serialize IOs to the same volume, which is + * realistic -- a physical disk processes one IO at a time. + *

+ * Read and write coalescing buffers are independent: a write to the same volume does not break read + * coalescing (they are separate physical operations at the device level). However, reads and writes + * share the per-volume IOPS budget. + *

+ * Each instance is fully self-contained with its own counters. Aggregation across volumes and + * DataNodes is handled by {@link EBSDevice}. + */ +public class EBSVolumeDevice { + + final int id; + final IOBudget bwBudget; + final IOBudget iopsBudget; + final int maxIoSizeBytes; + final int deviceLatencyUs; + + private Object lastReader; + private long lastReadEnd = -1; + private long pendingReadBytes; + + private Object lastWriter; + private long pendingWriteBytes; + + // Per-volume counters + final AtomicLong volumeDeviceReadOps = new AtomicLong(); + final AtomicLong volumeDeviceWriteOps = new AtomicLong(); + final AtomicLong totalBytesRead = new AtomicLong(); + final AtomicLong totalBytesWritten = new AtomicLong(); + final AtomicLong readOpCount = new AtomicLong(); + final AtomicLong writeOpCount = new AtomicLong(); + final AtomicLong bwReadSleepTimeMs = new AtomicLong(); + final AtomicLong bwReadSleepCount = new AtomicLong(); + final AtomicLong bwWriteSleepTimeMs = new AtomicLong(); + final AtomicLong bwWriteSleepCount = new AtomicLong(); + final AtomicLong iopsReadSleepTimeMs = new AtomicLong(); + final AtomicLong iopsReadSleepCount = new AtomicLong(); + final AtomicLong iopsWriteSleepTimeMs = new AtomicLong(); + final AtomicLong iopsWriteSleepCount = new AtomicLong(); + final AtomicLong latencySleepCount = new AtomicLong(); + final AtomicLong latencySleepTimeUs = new AtomicLong(); + + /** + * @param id volume index within the DataNode + * @param bwBudget per-volume BW token bucket (null = unlimited) + * @param iopsBudget per-volume IOPS token bucket (null = unlimited) + * @param maxIoSizeBytes max coalesced IO size in bytes (0 = no coalescing) + * @param deviceLatencyUs per-IO device latency in microseconds (0 = disabled) + */ + public EBSVolumeDevice(int id, IOBudget bwBudget, IOBudget iopsBudget, int maxIoSizeBytes, + int deviceLatencyUs) { + this.id = id; + this.bwBudget = bwBudget; + this.iopsBudget = iopsBudget; + this.maxIoSizeBytes = maxIoSizeBytes; + this.deviceLatencyUs = deviceLatencyUs; + } + + public int getId() { + return id; + } + + /** + * Account a read operation against this volume's BW and IOPS budgets. Sequential reads from the + * same stream are coalesced up to {@code maxIoSizeBytes} before consuming an IOPS token. + * Interleaving (a different stream, or a non-sequential position) flushes pending coalesced IO. + */ + public synchronized void accountRead(Object stream, long position, int bytes) { + readOpCount.incrementAndGet(); + totalBytesRead.addAndGet(bytes); + if (iopsBudget != null) { + if (maxIoSizeBytes > 0) { + if (lastReader != stream || (lastReadEnd >= 0 && position != lastReadEnd)) { + flushPendingReadIopsInternal(); + } + lastReader = stream; + pendingReadBytes += bytes; + lastReadEnd = position + bytes; + while (pendingReadBytes >= maxIoSizeBytes) { + pendingReadBytes -= maxIoSizeBytes; + consumeReadIops(); + } + } else { + consumeReadIops(); + } + } + if (bwBudget != null) { + long slept = bwBudget.consume(bytes); + if (slept > 0) { + bwReadSleepTimeMs.addAndGet(slept); + bwReadSleepCount.incrementAndGet(); + } + } + } + + public synchronized void flushPendingReadIops(Object stream) { + if (lastReader == stream) { + flushPendingReadIopsInternal(); + } + } + + private void flushPendingReadIopsInternal() { + if (pendingReadBytes > 0 && iopsBudget != null) { + consumeReadIops(); + } + pendingReadBytes = 0; + lastReader = null; + lastReadEnd = -1; + } + + private void consumeReadIops() { + volumeDeviceReadOps.incrementAndGet(); + long slept = iopsBudget.consume(1); + if (slept > 0) { + iopsReadSleepTimeMs.addAndGet(slept); + iopsReadSleepCount.incrementAndGet(); + } + applyDeviceLatency(); + } + + /** + * Account a write operation against this volume's BW and IOPS budgets. Sequential writes from the + * same stream are coalesced up to {@code maxIoSizeBytes}. + */ + public synchronized void accountWrite(Object stream, int bytes) { + writeOpCount.incrementAndGet(); + totalBytesWritten.addAndGet(bytes); + if (iopsBudget != null) { + if (maxIoSizeBytes > 0) { + if (lastWriter != stream) { + flushPendingWriteIopsInternal(); + } + lastWriter = stream; + pendingWriteBytes += bytes; + while (pendingWriteBytes >= maxIoSizeBytes) { + pendingWriteBytes -= maxIoSizeBytes; + consumeWriteIops(); + } + } else { + consumeWriteIops(); + } + } + if (bwBudget != null) { + long slept = bwBudget.consume(bytes); + if (slept > 0) { + bwWriteSleepTimeMs.addAndGet(slept); + bwWriteSleepCount.incrementAndGet(); + } + } + } + + /** + * Charge a bulk write against this volume's budgets. Used for block-level write throttling at + * finalize time, where the total bytes are known but were not individually intercepted. + * @param totalBytes total bytes written + */ + public synchronized void accountBulkWrite(long totalBytes) { + writeOpCount.incrementAndGet(); + totalBytesWritten.addAndGet(totalBytes); + if (iopsBudget != null) { + int opsToCharge = + maxIoSizeBytes > 0 ? (int) ((totalBytes + maxIoSizeBytes - 1) / maxIoSizeBytes) : 1; + for (int i = 0; i < opsToCharge; i++) { + consumeWriteIops(); + } + } + if (bwBudget != null) { + long slept = bwBudget.consume(totalBytes); + if (slept > 0) { + bwWriteSleepTimeMs.addAndGet(slept); + bwWriteSleepCount.incrementAndGet(); + } + } + } + + public synchronized void flushPendingWriteIops(Object stream) { + if (lastWriter == stream) { + flushPendingWriteIopsInternal(); + } + } + + private void flushPendingWriteIopsInternal() { + if (pendingWriteBytes > 0 && iopsBudget != null) { + consumeWriteIops(); + } + pendingWriteBytes = 0; + lastWriter = null; + } + + private void consumeWriteIops() { + volumeDeviceWriteOps.incrementAndGet(); + long slept = iopsBudget.consume(1); + if (slept > 0) { + iopsWriteSleepTimeMs.addAndGet(slept); + iopsWriteSleepCount.incrementAndGet(); + } + applyDeviceLatency(); + } + + private void applyDeviceLatency() { + if (deviceLatencyUs > 0) { + latencySleepCount.incrementAndGet(); + long startNs = System.nanoTime(); + try { + Thread.sleep(deviceLatencyUs / 1000, (deviceLatencyUs % 1000) * 1000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + latencySleepTimeUs.addAndGet((System.nanoTime() - startNs) / 1000); + } + } + + public synchronized void reset() { + lastReader = null; + lastReadEnd = -1; + pendingReadBytes = 0; + lastWriter = null; + pendingWriteBytes = 0; + volumeDeviceReadOps.set(0); + volumeDeviceWriteOps.set(0); + totalBytesRead.set(0); + totalBytesWritten.set(0); + readOpCount.set(0); + writeOpCount.set(0); + bwReadSleepTimeMs.set(0); + bwReadSleepCount.set(0); + bwWriteSleepTimeMs.set(0); + bwWriteSleepCount.set(0); + iopsReadSleepTimeMs.set(0); + iopsReadSleepCount.set(0); + iopsWriteSleepTimeMs.set(0); + iopsWriteSleepCount.set(0); + latencySleepCount.set(0); + latencySleepTimeUs.set(0); + if (bwBudget != null) { + bwBudget.reset(); + } + if (iopsBudget != null) { + iopsBudget.reset(); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/devsim/IOBudget.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/devsim/IOBudget.java new file mode 100644 index 000000000000..703101fa7932 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/devsim/IOBudget.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.devsim; + +/** + * Token-bucket rate limiter. Supports a configurable tokens-per-second rate with windowed refill. + * Used for per-volume BW budgets, per-volume IOPS budgets, and instance-level aggregate BW caps. + *

+ * When all tokens in the current window are consumed, {@link #consume(long)} blocks until the next + * window opens, providing backpressure to the caller. + */ +public class IOBudget { + + private final long tokensPerSec; + private final long tokensPerWindow; + private final long windowMs; + private final long millisPerToken; + private final boolean lowRateMode; + private long availableTokens; + private long windowStartTime; + private long nextTokenTimeMs; + + /** + * @param tokensPerSec tokens replenished per second (e.g. bytes/sec for BW, ops/sec for IOPS) + * @param windowMs refill window duration in milliseconds + */ + public IOBudget(long tokensPerSec, long windowMs) { + this.tokensPerSec = tokensPerSec; + this.windowMs = windowMs; + this.tokensPerWindow = tokensPerSec * windowMs / 1000; + this.lowRateMode = tokensPerSec > 0 && this.tokensPerWindow == 0; + this.millisPerToken = lowRateMode ? Math.max(1, (1000 + tokensPerSec - 1) / tokensPerSec) : 0; + this.availableTokens = lowRateMode ? 0 : tokensPerWindow; + this.windowStartTime = System.currentTimeMillis(); + this.nextTokenTimeMs = this.windowStartTime; + } + + public synchronized void reset() { + this.availableTokens = lowRateMode ? 0 : tokensPerWindow; + this.windowStartTime = System.currentTimeMillis(); + this.nextTokenTimeMs = this.windowStartTime; + } + + /** + * Consume tokens from the budget. Blocks (sleeps) if the budget is exhausted until enough tokens + * are available. + * @param tokens number of tokens to consume + * @return total milliseconds slept waiting for tokens + */ + public synchronized long consume(long tokens) { + if (tokens <= 0) { + return 0; + } + if (tokensPerSec <= 0) { + return 0; + } + if (lowRateMode) { + return consumeLowRate(tokens); + } + long totalSlept = 0; + long remaining = tokens; + while (remaining > 0) { + long now = System.currentTimeMillis(); + long elapsed = now - windowStartTime; + if (elapsed >= windowMs) { + long windowsPassed = elapsed / windowMs; + availableTokens = tokensPerWindow; + windowStartTime += windowsPassed * windowMs; + } + long toConsume = Math.min(remaining, availableTokens); + if (toConsume > 0) { + availableTokens -= toConsume; + remaining -= toConsume; + } + if (remaining > 0) { + long sleepTime = windowMs - (System.currentTimeMillis() - windowStartTime); + if (sleepTime <= 0) { + sleepTime = 1; + } + totalSlept += sleepTime; + try { + wait(sleepTime); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; + } + } + } + notifyAll(); + return totalSlept; + } + + /** + * Low-rate path for configurations where {@code tokensPerWindow == 0}. In this case we schedule + * one token every {@code millisPerToken} and block callers until the next token time. + */ + private long consumeLowRate(long tokens) { + long totalSlept = 0; + for (long i = 0; i < tokens; i++) { + long now = System.currentTimeMillis(); + if (now < nextTokenTimeMs) { + long sleepTime = nextTokenTimeMs - now; + totalSlept += sleepTime; + try { + wait(sleepTime); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; + } + } else { + nextTokenTimeMs = now; + } + nextTokenTimeMs += millisPerToken; + } + notifyAll(); + return totalSlept; + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/devsim/TestEBSDeviceLayer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/devsim/TestEBSDeviceLayer.java new file mode 100644 index 000000000000..c008b484124a --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/devsim/TestEBSDeviceLayer.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.devsim; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.testclassification.IOTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Integration test verifying that the EBS device layer proxy is correctly installed by + * MiniDFSCluster and intercepts read and write IO at the DataNode storage level. + *

+ * Starts a single-DataNode MiniDFSCluster with 2 storage volumes and the EBS device layer + * configured with high bandwidth/IOPS budgets (so throttling does not slow the test) but with + * device latency disabled. Writes data through HBase, flushes, reads it back via scan and get, and + * asserts that the device layer metrics reflect the IO. + */ +@Category({ IOTests.class, MediumTests.class }) +public class TestEBSDeviceLayer { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestEBSDeviceLayer.class); + + private static final Logger LOG = LoggerFactory.getLogger(TestEBSDeviceLayer.class); + + private static final TableName TABLE_NAME = TableName.valueOf("TestEBSDeviceLayer"); + private static final byte[] FAMILY = Bytes.toBytes("f"); + private static final byte[] QUALIFIER = Bytes.toBytes("q"); + private static final int NUM_ROWS = 200; + private static final int VALUE_SIZE = 4096; + private static final int NUM_VOLUMES = 2; + + private static HBaseTestingUtil UTIL; + + @BeforeClass + public static void setUp() throws Exception { + Configuration conf = HBaseConfiguration.create(); + conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, + "org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy"); + conf.setBoolean("hbase.tests.use.shortcircuit.reads", false); + EBSDevice.configure(conf, 1000, 100000, 0, 1024, 0); + + UTIL = new HBaseTestingUtil(conf); + UTIL.startMiniZKCluster(); + MiniDFSCluster dfsCluster = + new MiniDFSCluster.Builder(conf).numDataNodes(1).storagesPerDatanode(NUM_VOLUMES).build(); + dfsCluster.waitClusterUp(); + UTIL.setDFSCluster(dfsCluster); + UTIL.startMiniCluster(1); + } + + @AfterClass + public static void tearDown() throws Exception { + EBSDevice.shutdown(); + if (UTIL != null) { + UTIL.shutdownMiniCluster(); + } + } + + @Test + public void testDeviceLayerInterceptsIO() throws Exception { + assertEquals("Expected 1 DataNode registered with EBSDevice", 1, EBSDevice.getNumDataNodes()); + EBSDevice.DataNodeContext dnCtx = EBSDevice.getDataNodeContext(0); + assertEquals("Expected " + NUM_VOLUMES + " volumes", NUM_VOLUMES, dnCtx.getNumVolumes()); + + TableDescriptor desc = TableDescriptorBuilder.newBuilder(TABLE_NAME).setColumnFamily( + ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setBlocksize(64 * 1024).build()).build(); + UTIL.getAdmin().createTable(desc); + UTIL.waitUntilAllRegionsAssigned(TABLE_NAME); + + EBSDevice.resetMetrics(); + + byte[] value = new byte[VALUE_SIZE]; + Bytes.random(value); + try (Table table = UTIL.getConnection().getTable(TABLE_NAME)) { + for (int i = 0; i < NUM_ROWS; i++) { + Put put = new Put(Bytes.toBytes(String.format("row-%05d", i))); + put.addColumn(FAMILY, QUALIFIER, value); + table.put(put); + } + } + + UTIL.getAdmin().flush(TABLE_NAME); + waitForFlush(); + + long writeBytesAfterFlush = EBSDevice.getTotalBytesWritten(); + long writeInterceptsAfterFlush = EBSDevice.getWriteInterceptCount(); + LOG.info("After write+flush: bytesWritten={}, writeIntercepts={}, deviceWriteOps={}", + writeBytesAfterFlush, writeInterceptsAfterFlush, EBSDevice.getDeviceWriteOps()); + + assertTrue("Expected write intercepts after flush, got " + writeInterceptsAfterFlush, + writeInterceptsAfterFlush > 0); + assertTrue("Expected bytes written > 0 after flush, got " + writeBytesAfterFlush, + writeBytesAfterFlush > 0); + + EBSDevice.resetMetrics(); + int rowCount = 0; + try (Table table = UTIL.getConnection().getTable(TABLE_NAME)) { + try (ResultScanner scanner = table.getScanner(new Scan())) { + Result result; + while ((result = scanner.next()) != null) { + assertTrue("Row should not be empty", !result.isEmpty()); + rowCount++; + } + } + } + assertEquals("Expected to read back all rows", NUM_ROWS, rowCount); + + long readBytes = EBSDevice.getTotalBytesRead(); + long readIntercepts = EBSDevice.getReadInterceptCount(); + long deviceReadOps = EBSDevice.getDeviceReadOps(); + long readOps = EBSDevice.getReadOpCount(); + LOG.info("After scan: bytesRead={}, readIntercepts={}, appReadOps={}, deviceReadOps={}", + readBytes, readIntercepts, readOps, deviceReadOps); + + assertTrue("Expected read intercepts after scan, got " + readIntercepts, readIntercepts > 0); + assertTrue("Expected bytes read > 0 after scan, got " + readBytes, readBytes > 0); + assertTrue("Expected device read ops > 0 (IOPS coalescing should still produce ops), got " + + deviceReadOps, deviceReadOps > 0); + + EBSDevice.resetMetrics(); + try (Table table = UTIL.getConnection().getTable(TABLE_NAME)) { + Result result = table.get(new Get(Bytes.toBytes("row-00050"))); + assertTrue("Get should return data", !result.isEmpty()); + } + long getReadIntercepts = EBSDevice.getReadInterceptCount(); + LOG.info("After get: readIntercepts={}, bytesRead={}", getReadIntercepts, + EBSDevice.getTotalBytesRead()); + assertTrue("Expected read intercepts after get, got " + getReadIntercepts, + getReadIntercepts > 0); + + long totalIntercepts = EBSDevice.getReadInterceptCount() + EBSDevice.getWriteInterceptCount(); + long unresolved = EBSDevice.getUnresolvedVolumeCount(); + if (totalIntercepts > 0) { + double unresolvedRatio = (double) unresolved / totalIntercepts; + assertTrue("Unresolved volume ratio too high: " + unresolvedRatio + " (unresolved=" + + unresolved + ", total=" + totalIntercepts + ")", unresolvedRatio <= 0.01); + } + + LOG.info("Per-volume stats: {}", EBSDevice.getPerVolumeStats()); + } + + private void waitForFlush() throws Exception { + long deadline = System.currentTimeMillis() + 60000; + while (System.currentTimeMillis() < deadline) { + long memstoreSize = 0; + for (HRegion region : UTIL.getMiniHBaseCluster().getRegionServer(0).getRegions(TABLE_NAME)) { + memstoreSize += region.getMemStoreDataSize(); + } + if (memstoreSize == 0) { + return; + } + Thread.sleep(500); + } + throw new IOException("Flush did not complete within timeout"); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/devsim/TestIOBudget.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/devsim/TestIOBudget.java new file mode 100644 index 000000000000..58f79b0e56a0 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/devsim/TestIOBudget.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.devsim; + +import static org.junit.Assert.assertTrue; + +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.testclassification.IOTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ IOTests.class, SmallTests.class }) +public class TestIOBudget { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestIOBudget.class); + + @Test + public void testLowRateModeDoesNotDeadlock() { + IOBudget budget = new IOBudget(2, 100); + long t0 = System.currentTimeMillis(); + budget.consume(1); + long t1 = System.currentTimeMillis(); + budget.consume(1); + long elapsedSecondTokenMs = System.currentTimeMillis() - t1; + + // At 2 tokens/sec, second token should require waiting roughly 500ms. + assertTrue("Expected low-rate budget to throttle second token, elapsed=" + elapsedSecondTokenMs, + elapsedSecondTokenMs >= 300); + assertTrue("Unexpectedly long low-rate throttle delay, elapsed=" + elapsedSecondTokenMs, + elapsedSecondTokenMs < 3000); + assertTrue("Clock sanity check", t1 >= t0); + } + + @Test + public void testRegularWindowModeStillThrottles() { + // 100 tokens/sec with 100ms windows -> 10 tokens/window + IOBudget budget = new IOBudget(100, 100); + long elapsedMs = budget.consume(15); + assertTrue("Expected consume to sleep when exceeding window budget", elapsedMs > 0); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/devsim/ThrottledBlockInputStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/devsim/ThrottledBlockInputStream.java new file mode 100644 index 000000000000..268fb93f9028 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/devsim/ThrottledBlockInputStream.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.devsim; + +import java.io.IOException; +import java.io.InputStream; + +/** + * Wraps a block data {@link InputStream} returned by the DataNode's {@code getBlockInputStream} + * with per-byte BW and IOPS throttling against the volume's {@link EBSVolumeDevice}. Tracks the + * read position for sequential-IO coalescing detection. + *

+ * Also charges the per-DataNode instance-level BW budget if configured. + */ +public class ThrottledBlockInputStream extends InputStream { + + private final InputStream delegate; + private final EBSVolumeDevice volume; + private final EBSDevice.DataNodeContext dnContext; + private long position; + + /** + * @param delegate the real block input stream from FsDatasetImpl + * @param volume the EBS volume device this block resides on + * @param dnContext the DataNode context for instance-level BW budget (may be null) + * @param offset the initial seek offset into the block + */ + public ThrottledBlockInputStream(InputStream delegate, EBSVolumeDevice volume, + EBSDevice.DataNodeContext dnContext, long offset) { + this.delegate = delegate; + this.volume = volume; + this.dnContext = dnContext; + this.position = offset; + } + + @Override + public int read() throws IOException { + int b = delegate.read(); + if (b >= 0) { + accountRead(1); + } + return b; + } + + @Override + public int read(byte[] buf, int off, int len) throws IOException { + int bytesRead = delegate.read(buf, off, len); + if (bytesRead > 0) { + accountRead(bytesRead); + } + return bytesRead; + } + + @Override + public long skip(long n) throws IOException { + long skipped = delegate.skip(n); + if (skipped > 0) { + volume.flushPendingReadIops(this); + position += skipped; + } + return skipped; + } + + @Override + public int available() throws IOException { + return delegate.available(); + } + + @Override + public void close() throws IOException { + volume.flushPendingReadIops(this); + delegate.close(); + } + + private void accountRead(int bytes) { + volume.accountRead(this, position, bytes); + position += bytes; + if (dnContext != null) { + dnContext.consumeInstanceBw(bytes); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/devsim/ThrottledFsDataset.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/devsim/ThrottledFsDataset.java new file mode 100644 index 000000000000..81478685bb6d --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/devsim/ThrottledFsDataset.java @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.devsim; + +import java.io.InputStream; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Dynamic proxy wrapping a real {@code FsDatasetImpl} to apply EBS volume emulation at the DataNode + * storage level. One proxy is created per DataNode by {@link ThrottledFsDatasetFactory}. + *

+ * Models the Linux ext4 page cache behavior: application writes go to the page cache with no EBS + * charge. EBS is engaged only when dirty pages are flushed to the block device, which happens at + * two interception points on {@code FsDatasetSpi}: + *

    + *
  • {@code getBlockInputStream(ExtendedBlock, long)} -- wraps the returned InputStream with + * {@link ThrottledBlockInputStream} for per-byte read throttling + *
  • {@code submitBackgroundSyncFileRangeRequest(ExtendedBlock, ..., nbytes, ...)} -- charges + * {@code nbytes} against the volume's BW and IOPS budgets, modeling the async writeback of dirty + * pages triggered by {@code sync_file_range(SYNC_FILE_RANGE_WRITE)} every ~8MB + *
  • {@code finalizeBlock(ExtendedBlock, boolean)} -- charges the remaining unflushed bytes (block + * size minus bytes already charged via sync_file_range) against the volume's budgets, modeling the + * {@code fsync()} at block finalization + *
+ * Cumulative synced bytes are tracked per block to avoid double-counting between the intermediate + * sync_file_range charges and the final fsync charge. + *

+ * All other methods are delegated transparently to the inner {@code FsDatasetImpl}. + */ +public final class ThrottledFsDataset { + + private static final Logger LOG = LoggerFactory.getLogger(ThrottledFsDataset.class); + + private ThrottledFsDataset() { + } + + /** + * Create a dynamic proxy wrapping the given {@code FsDatasetSpi} delegate with EBS throttling. + * @param delegate the real FsDatasetImpl + * @param dnId DataNode identifier for metrics registration + * @param conf Hadoop configuration with EBS parameters + * @return a proxy implementing FsDatasetSpi with throttling + */ + @SuppressWarnings("unchecked") + public static > T wrap(FsDatasetSpi delegate, String dnId, + Configuration conf) { + + long bytesPerSec = conf.getLong(EBSDevice.IO_BUDGET_BYTES_PER_SEC_KEY, 0); + int iops = conf.getInt(EBSDevice.IO_BUDGET_IOPS_KEY, 0); + int windowMs = conf.getInt(EBSDevice.IO_BUDGET_WINDOW_MS_KEY, EBSDevice.DEFAULT_WINDOW_MS); + int maxIoKb = conf.getInt(EBSDevice.IO_MAX_IO_SIZE_KB_KEY, EBSDevice.DEFAULT_MAX_IO_SIZE_KB); + int maxIoSizeBytes = maxIoKb * 1024; + int instMbps = conf.getInt(EBSDevice.IO_INSTANCE_MBPS_KEY, EBSDevice.DEFAULT_INSTANCE_MBPS); + int deviceLatencyUs = + conf.getInt(EBSDevice.IO_DEVICE_LATENCY_US_KEY, EBSDevice.DEFAULT_DEVICE_LATENCY_US); + + List realVolumes = new ArrayList<>(); + try (FsDatasetSpi.FsVolumeReferences refs = delegate.getFsVolumeReferences()) { + for (FsVolumeSpi v : refs) { + realVolumes.add(v); + } + } catch (Exception e) { + throw new RuntimeException("Failed to enumerate volumes from delegate dataset", e); + } + + int numVolumes = realVolumes.size(); + EBSVolumeDevice[] ebsVolumes = new EBSVolumeDevice[numVolumes]; + Map storageIdToVolume = new HashMap<>(); + for (int i = 0; i < numVolumes; i++) { + IOBudget volBw = bytesPerSec > 0 ? new IOBudget(bytesPerSec, windowMs) : null; + IOBudget volIops = iops > 0 ? new IOBudget(iops, windowMs) : null; + ebsVolumes[i] = new EBSVolumeDevice(i, volBw, volIops, maxIoSizeBytes, deviceLatencyUs); + String storageId = realVolumes.get(i).getStorageID(); + storageIdToVolume.put(storageId, ebsVolumes[i]); + } + + IOBudget instanceBw = null; + if (instMbps > 0) { + long instBytesPerSec = (long) instMbps * 1024L * 1024L; + instanceBw = new IOBudget(instBytesPerSec, windowMs); + } + + EBSDevice.DataNodeContext dnContext = + EBSDevice.register(dnId, ebsVolumes, instanceBw, bytesPerSec, iops, deviceLatencyUs); + + int reportIntervalSec = conf.getInt(EBSDevice.IO_BUDGET_REPORT_INTERVAL_SEC_KEY, + EBSDevice.DEFAULT_REPORT_INTERVAL_SEC); + EBSDevice.startMetricsReporter(reportIntervalSec); + + LOG.info( + "ThrottledFsDataset: DN {} wrapped with {} EBS volumes " + + "(BW={} MB/s, IOPS={}, coalesce={}KB, latency={}us, instance_cap={} MB/s)", + dnId, numVolumes, + bytesPerSec > 0 ? String.format("%.1f", bytesPerSec / (1024.0 * 1024.0)) : "unlimited", + iops > 0 ? iops : "unlimited", maxIoKb > 0 ? maxIoKb : "off", + deviceLatencyUs > 0 ? deviceLatencyUs : "off", instMbps > 0 ? instMbps : "unlimited"); + + String[] interceptedMethods = + { "getBlockInputStream", "submitBackgroundSyncFileRangeRequest", "finalizeBlock" }; + for (String m : interceptedMethods) { + boolean found = false; + for (Method dm : delegate.getClass().getMethods()) { + if (dm.getName().equals(m)) { + found = true; + break; + } + } + if (!found) { + LOG.warn("EBS interception target method '{}' not found on {}; " + + "throttling for this path will be inactive", m, delegate.getClass().getName()); + } + } + + EBSInvocationHandler handler = new EBSInvocationHandler(delegate, storageIdToVolume, dnContext); + + Set> interfaces = new HashSet<>(); + collectInterfaces(delegate.getClass(), interfaces); + // Ensure FsDatasetSpi is always included + interfaces.add(FsDatasetSpi.class); + + return (T) Proxy.newProxyInstance(delegate.getClass().getClassLoader(), + interfaces.toArray(new Class[0]), handler); + } + + private static void collectInterfaces(Class clazz, Set> result) { + if (clazz == null || clazz == Object.class) { + return; + } + for (Class iface : clazz.getInterfaces()) { + if (result.add(iface)) { + collectInterfaces(iface, result); + } + } + collectInterfaces(clazz.getSuperclass(), result); + } + + private static class EBSInvocationHandler implements InvocationHandler { + + private final FsDatasetSpi delegate; + private final Map storageIdToVolume; + private final EBSDevice.DataNodeContext dnContext; + private final ConcurrentHashMap syncedBytesPerBlock = new ConcurrentHashMap<>(); + + EBSInvocationHandler(FsDatasetSpi delegate, Map storageIdToVolume, + EBSDevice.DataNodeContext dnContext) { + this.delegate = delegate; + this.storageIdToVolume = storageIdToVolume; + this.dnContext = dnContext; + } + + @Override + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + String name = method.getName(); + + // Read path: wrap the block input stream with throttling + if ( + "getBlockInputStream".equals(name) && args != null && args.length >= 2 + && args[0] instanceof ExtendedBlock + ) { + return handleGetBlockInputStream(method, args); + } + + // Write path: intermediate async writeback of dirty pages (sync_file_range) + if ( + "submitBackgroundSyncFileRangeRequest".equals(name) && args != null && args.length >= 4 + && args[0] instanceof ExtendedBlock + ) { + return handleSyncFileRange(method, args); + } + + // Write path: final fsync at block finalization (charge remaining unflushed bytes) + if ( + "finalizeBlock".equals(name) && args != null && args.length >= 1 + && args[0] instanceof ExtendedBlock + ) { + return handleFinalizeBlock(method, args); + } + + // Object methods + if ("toString".equals(name) && (args == null || args.length == 0)) { + return "ThrottledFsDataset[" + delegate.toString() + "]"; + } + if ("hashCode".equals(name) && (args == null || args.length == 0)) { + return System.identityHashCode(proxy); + } + if ("equals".equals(name) && args != null && args.length == 1) { + return proxy == args[0]; + } + + return delegateInvoke(method, args); + } + + private Object handleGetBlockInputStream(Method method, Object[] args) throws Throwable { + ExtendedBlock block = (ExtendedBlock) args[0]; + long offset = (Long) args[1]; + EBSDevice.recordReadIntercept(); + + Object result = delegateInvoke(method, args); + if (result instanceof InputStream) { + EBSVolumeDevice vol = resolveVolume(block); + if (vol != null) { + return new ThrottledBlockInputStream((InputStream) result, vol, dnContext, offset); + } + EBSDevice.recordUnresolvedVolume(); + } + return result; + } + + /** + * Intercepts the DataNode's async dirty page writeback. In production, BlockReceiver calls this + * every ~8MB ({@code CACHE_DROP_LAG_BYTES}) to issue + * {@code sync_file_range(fd, offset, nbytes, SYNC_FILE_RANGE_WRITE)}, which triggers + * asynchronous writeback of dirty pages from the page cache to the EBS device. + *

+ * We charge {@code nbytes} against the volume's BW and IOPS budgets here, and track the + * cumulative synced bytes per block so that {@code finalizeBlock} only charges the remaining + * unflushed delta. + */ + private Object handleSyncFileRange(Method method, Object[] args) throws Throwable { + ExtendedBlock block = (ExtendedBlock) args[0]; + // args: (ExtendedBlock, ReplicaOutputStreams, long offset, long nbytes, int flags) + long nbytes = args[3] instanceof Number ? ((Number) args[3]).longValue() : 0; + EBSDevice.recordWriteIntercept(); + EBSVolumeDevice vol = resolveVolume(block); + if (vol != null && nbytes > 0) { + vol.accountBulkWrite(nbytes); + dnContext.consumeInstanceBw(nbytes); + syncedBytesPerBlock.merge(block.getBlockId(), nbytes, Long::sum); + } else if (vol == null) { + EBSDevice.recordUnresolvedVolume(); + } + return delegateInvoke(method, args); + } + + /** + * Intercepts block finalization, which in the DataNode follows immediately after + * {@code BlockReceiver.close()} (which includes {@code syncDataOut()} / fsync). Charges only + * the remaining bytes not already flushed via {@code sync_file_range} calls. + */ + private Object handleFinalizeBlock(Method method, Object[] args) throws Throwable { + ExtendedBlock block = (ExtendedBlock) args[0]; + EBSDevice.recordWriteIntercept(); + EBSVolumeDevice vol = resolveVolume(block); + if (vol != null) { + long blockBytes = block.getNumBytes(); + long alreadySynced = syncedBytesPerBlock.getOrDefault(block.getBlockId(), 0L); + long remaining = Math.max(0, blockBytes - alreadySynced); + if (remaining > 0) { + vol.accountBulkWrite(remaining); + dnContext.consumeInstanceBw(remaining); + } + syncedBytesPerBlock.remove(block.getBlockId()); + } else { + EBSDevice.recordUnresolvedVolume(); + } + return delegateInvoke(method, args); + } + + private EBSVolumeDevice resolveVolume(ExtendedBlock block) { + try { + FsVolumeSpi vol = delegate.getVolume(block); + if (vol != null) { + return storageIdToVolume.get(vol.getStorageID()); + } + } catch (Exception e) { + LOG.debug("Could not resolve volume for block {}", block, e); + } + return null; + } + + private Object delegateInvoke(Method method, Object[] args) throws Throwable { + try { + return method.invoke(delegate, args); + } catch (InvocationTargetException e) { + throw e.getCause(); + } + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/devsim/ThrottledFsDatasetFactory.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/devsim/ThrottledFsDatasetFactory.java new file mode 100644 index 000000000000..595afa82ccc0 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/devsim/ThrottledFsDatasetFactory.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.devsim; + +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.DataStorage; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Factory that creates {@link ThrottledFsDataset} proxies wrapping the standard + * {@code FsDatasetImpl}. Configured via: + * + *

+ * conf.set("dfs.datanode.fsdataset.factory", ThrottledFsDatasetFactory.class.getName());
+ * 
+ * + * Hadoop calls {@link #newInstance} once per DataNode during startup. Each invocation creates an + * independent proxy with its own set of {@link EBSVolumeDevice} instances. + */ +@SuppressWarnings("rawtypes") +public class ThrottledFsDatasetFactory extends FsDatasetSpi.Factory { + + private static final Logger LOG = LoggerFactory.getLogger(ThrottledFsDatasetFactory.class); + + private static final String DEFAULT_FACTORY_CLASS = + "org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetFactory"; + + @Override + @SuppressWarnings("unchecked") + public FsDatasetSpi newInstance(DataNode datanode, DataStorage storage, Configuration conf) + throws IOException { + FsDatasetSpi.Factory defaultFactory = loadDefaultFactory(); + FsDatasetSpi inner = defaultFactory.newInstance(datanode, storage, conf); + String dnId = datanode.getDatanodeId() != null + ? datanode.getDatanodeId().getDatanodeUuid() + : "DN-" + System.identityHashCode(datanode); + LOG.info("ThrottledFsDatasetFactory: creating EBS device layer for DataNode {}", dnId); + return ThrottledFsDataset.wrap(inner, dnId, conf); + } + + @SuppressWarnings("unchecked") + private static FsDatasetSpi.Factory loadDefaultFactory() throws IOException { + try { + Class clazz = Class.forName(DEFAULT_FACTORY_CLASS); + return (FsDatasetSpi.Factory) clazz.getDeclaredConstructor().newInstance(); + } catch (Exception e) { + throw new IOException("Failed to load default FsDataset factory: " + DEFAULT_FACTORY_CLASS, + e); + } + } +}