From abdffaa8b16803ebaef825cde6d94ebf6b366961 Mon Sep 17 00:00:00 2001 From: Andrew Purtell Date: Tue, 7 Apr 2026 12:57:21 -0700 Subject: [PATCH] HBASE-30062 Device layer simulator for MiniDFSCluster-based tests On EBS-backed deployments in AWS, or equivalents in other cloud infrastructure providers, HBase compaction and replication throughput can be constrained by per-volume IOPS limits rather than bandwidth. A faithful device-level simulator within the test harness allows developers to reproduce, analyze, and validate fixes for such performance issues without requiring actual cloud infrastructure. This proposed change adds a test-only EBS device layer that operates at the DataNode storage level within MiniDFSCluster by replacing the FsDatasetSpi implementation via Hadoop's pluggable factory mechanism. This allows HBase integration tests to simulate realistic cloud block storage characteristics, such as per-volume bandwidth budgets, IOPS limits, sequential IO coalescing, and per-IO device latency, enabling identification and reproduction of IO bottlenecks. The simulator wraps the real FsDatasetImpl with a java.lang.reflect.Proxy that intercepts the three FsDatasetSpi methods where DataNode local IO actually engages the underlying block device, without compile-time coupling to internal Hadoop classes. Each proxy gets its own set of EBSVolumeDevice instances with independent budgets. Block-to-volume resolution uses delegate.getVolume(block), providing real HDFS placement decisions. A single configuration applies to all volumes, but each volume maintains its own token buckets, matching production where all attached block devices to a host share the same SKU but have independent throughput budgets, and where the host itself has a cap on maximum aggregate throughput. EBS merges sequential IOs up to 1 MiB before counting them as a single IOPS token. The simulator tracks read streams and write streams independently. After each IOPS token consumption, the simulator sleeps for a configurable duration (default 1 ms), modeling physical device service time. IntegrationTestCompactionWithDeviceSimulator provides an end-to-end example of using the simulator. --- ...tionTestCompactionWithDeviceSimulator.java | 564 ++++++++++++++++ .../hadoop/hbase/io/devsim/EBSDevice.java | 613 ++++++++++++++++++ .../hbase/io/devsim/EBSVolumeDevice.java | 266 ++++++++ .../hadoop/hbase/io/devsim/IOBudget.java | 133 ++++ .../hbase/io/devsim/TestEBSDeviceLayer.java | 201 ++++++ .../hadoop/hbase/io/devsim/TestIOBudget.java | 60 ++ .../io/devsim/ThrottledBlockInputStream.java | 97 +++ .../hbase/io/devsim/ThrottledFsDataset.java | 310 +++++++++ .../io/devsim/ThrottledFsDatasetFactory.java | 70 ++ 9 files changed, 2314 insertions(+) create mode 100644 hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestCompactionWithDeviceSimulator.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/io/devsim/EBSDevice.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/io/devsim/EBSVolumeDevice.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/io/devsim/IOBudget.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/io/devsim/TestEBSDeviceLayer.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/io/devsim/TestIOBudget.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/io/devsim/ThrottledBlockInputStream.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/io/devsim/ThrottledFsDataset.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/io/devsim/ThrottledFsDatasetFactory.java diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestCompactionWithDeviceSimulator.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestCompactionWithDeviceSimulator.java new file mode 100644 index 000000000000..5ed4cb92fccf --- /dev/null +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestCompactionWithDeviceSimulator.java @@ -0,0 +1,564 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.io.compress.Compression; +import org.apache.hadoop.hbase.io.devsim.EBSDevice; +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.io.hfile.BlockCompressedSizePredicator; +import org.apache.hadoop.hbase.io.hfile.PreviousBlockCompressionRatePredicator; +import org.apache.hadoop.hbase.io.hfile.UncompressedBlockSizePredicator; +import org.apache.hadoop.hbase.regionserver.BloomType; +import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.StoreEngine; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; +import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory; +import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; +import org.apache.hadoop.hbase.testclassification.IntegrationTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.JVMClusterUtil; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.util.ToolRunner; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.collect.Sets; +import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; + +/** + * Integration test that demonstrates the EBS device layer simulator's value for diagnosing + * compaction throughput under different storage device constraints. Runs HBase major compactions + * under two simulated EBS volume bandwidth limits (constrained and baseline) with realistic HFile + * configuration (Snappy compression, tunable compression ratio, configurable block parameters) and + * produces a diagnostic throughput analysis comparing the scenarios. + *

+ * This test always starts a local MiniDFSCluster with the EBS device layer installed — the + * distributed cluster mode of IntegrationTestBase is not applicable because the device simulator + * requires the {@code ThrottledFsDatasetFactory} to be injected at DataNode startup. + *

+ * JUnit / maven-failsafe execution: + * + *

+ * mvn verify -pl hbase-it -Dtest=IntegrationTestCompactionWithDeviceSimulator
+ * 
+ * + * CLI execution via hbase script: + * + *
+ * hbase org.apache.hadoop.hbase.IntegrationTestCompactionWithDeviceSimulator \
+ *   -totalDataBytes 536870912 -constrainedBwMbps 10 -compression LZ4
+ * 
+ * + * Configurable parameters (settable via CLI flags or + * {@code -Dhbase.IntegrationTestCompactionWithDeviceSimulator.}): + * + *
+ * # Data generation
+ *   totalDataBytes          (default 1073741824 = 1 GB)
+ *   valueSize               (default 102400 = 100 KB)
+ *   numFlushCycles          (default 10)
+ *   targetCompressionRatio  (default 3.0)
+ *
+ * # HFile / column family
+ *   compression             (default SNAPPY)
+ *   dataBlockEncoding       (default NONE)
+ *   blockSize               (default 65536 = 64 KB)
+ *   bloomType               (default ROW)
+ *   blockPredicator         (default PreviousBlockCompressionRatePredicator)
+ *
+ * # Device simulator
+ *   constrainedBwMbps       (default 25)
+ *   baselineBwMbps          (default 250)
+ *   budgetIops              (default 100000)
+ *   deviceLatencyUs         (default 0)
+ *   volumesPerDataNode      (default 1)
+ * 
+ */ +@Tag(IntegrationTests.TAG) +public class IntegrationTestCompactionWithDeviceSimulator extends IntegrationTestBase { + + private static final Logger LOG = + LoggerFactory.getLogger(IntegrationTestCompactionWithDeviceSimulator.class); + + private static final String CLASS_NAME = + IntegrationTestCompactionWithDeviceSimulator.class.getSimpleName(); + private static final String CONF_PREFIX = "hbase." + CLASS_NAME + "."; + + private static final TableName TABLE_NAME = TableName.valueOf(CLASS_NAME); + private static final byte[] FAMILY = Bytes.toBytes("f"); + private static final byte[] QUALIFIER = Bytes.toBytes("q"); + + private static final Map> PREDICATOR_CLASSES = + Map.of("PreviousBlockCompressionRatePredicator", PreviousBlockCompressionRatePredicator.class, + "UncompressedBlockSizePredicator", UncompressedBlockSizePredicator.class); + + // CLI option keys + private static final String OPT_TOTAL_DATA_BYTES = "totalDataBytes"; + private static final String OPT_VALUE_SIZE = "valueSize"; + private static final String OPT_NUM_FLUSH_CYCLES = "numFlushCycles"; + private static final String OPT_TARGET_COMPRESSION_RATIO = "targetCompressionRatio"; + private static final String OPT_COMPRESSION = "compression"; + private static final String OPT_DATA_BLOCK_ENCODING = "dataBlockEncoding"; + private static final String OPT_BLOCK_SIZE = "blockSize"; + private static final String OPT_BLOOM_TYPE = "bloomType"; + private static final String OPT_BLOCK_PREDICATOR = "blockPredicator"; + private static final String OPT_CONSTRAINED_BW_MBPS = "constrainedBwMbps"; + private static final String OPT_BASELINE_BW_MBPS = "baselineBwMbps"; + private static final String OPT_BUDGET_IOPS = "budgetIops"; + private static final String OPT_DEVICE_LATENCY_US = "deviceLatencyUs"; + private static final String OPT_VOLUMES_PER_DN = "volumesPerDataNode"; + + @Override + @BeforeEach + public void setUp() throws Exception { + util = getTestingUtil(getConf()); + } + + @Override + @AfterEach + public void cleanUp() throws Exception { + // Each scenario cleans up its own cluster. + } + + @Override + public void setUpCluster() throws Exception { + // Cluster lifecycle is per-scenario. + } + + @Override + public void setUpMonkey() throws Exception { + // Chaos monkey is not applicable for this test. + } + + @Override + public TableName getTablename() { + return TABLE_NAME; + } + + @Override + protected Set getColumnFamilies() { + return Sets.newHashSet(Bytes.toString(FAMILY)); + } + + @Override + protected void addOptions() { + super.addOptions(); + addOptWithArg(OPT_TOTAL_DATA_BYTES, "Total uncompressed data bytes (default 1073741824)"); + addOptWithArg(OPT_VALUE_SIZE, "Per-cell value size in bytes (default 102400)"); + addOptWithArg(OPT_NUM_FLUSH_CYCLES, "Number of flush cycles / store files (default 10)"); + addOptWithArg(OPT_TARGET_COMPRESSION_RATIO, + "Target compression ratio for generated values (default 3.0)"); + addOptWithArg(OPT_COMPRESSION, + "Compression algorithm: SNAPPY, LZ4, ZSTD, GZ, NONE " + "(default SNAPPY)"); + addOptWithArg(OPT_DATA_BLOCK_ENCODING, + "Data block encoding: NONE, PREFIX, DIFF, FAST_DIFF, ROW_INDEX_V1 (default NONE)"); + addOptWithArg(OPT_BLOCK_SIZE, "HFile data block size in bytes (default 65536)"); + addOptWithArg(OPT_BLOOM_TYPE, "Bloom filter type: NONE, ROW, ROWCOL (default ROW)"); + addOptWithArg(OPT_BLOCK_PREDICATOR, + "Block compressed size predicator class short name (default " + + "PreviousBlockCompressionRatePredicator)"); + addOptWithArg(OPT_CONSTRAINED_BW_MBPS, + "Per-volume BW limit for constrained scenario in MB/s (default 25)"); + addOptWithArg(OPT_BASELINE_BW_MBPS, + "Per-volume BW limit for baseline scenario in MB/s (default 250)"); + addOptWithArg(OPT_BUDGET_IOPS, "Per-volume IOPS budget (default 100000)"); + addOptWithArg(OPT_DEVICE_LATENCY_US, + "Per-IO device latency in microseconds, 0=disabled (default 0)"); + addOptWithArg(OPT_VOLUMES_PER_DN, "Number of storage volumes per DataNode (default 1)"); + } + + @Override + protected void processOptions(CommandLine cmd) { + processBaseOptions(cmd); + Configuration c = getConf(); + setOptToConf(cmd, c, OPT_TOTAL_DATA_BYTES); + setOptToConf(cmd, c, OPT_VALUE_SIZE); + setOptToConf(cmd, c, OPT_NUM_FLUSH_CYCLES); + setOptToConf(cmd, c, OPT_TARGET_COMPRESSION_RATIO); + setOptToConf(cmd, c, OPT_COMPRESSION); + setOptToConf(cmd, c, OPT_DATA_BLOCK_ENCODING); + setOptToConf(cmd, c, OPT_BLOCK_SIZE); + setOptToConf(cmd, c, OPT_BLOOM_TYPE); + setOptToConf(cmd, c, OPT_BLOCK_PREDICATOR); + setOptToConf(cmd, c, OPT_CONSTRAINED_BW_MBPS); + setOptToConf(cmd, c, OPT_BASELINE_BW_MBPS); + setOptToConf(cmd, c, OPT_BUDGET_IOPS); + setOptToConf(cmd, c, OPT_DEVICE_LATENCY_US); + setOptToConf(cmd, c, OPT_VOLUMES_PER_DN); + } + + private static void setOptToConf(CommandLine cmd, Configuration conf, String opt) { + if (cmd.hasOption(opt)) { + conf.set(CONF_PREFIX + opt, cmd.getOptionValue(opt)); + } + } + + @Override + public int runTestFromCommandLine() throws Exception { + doTestCompaction(); + return 0; + } + + @Test + public void testCompactionThroughputVariesWithDeviceBandwidth() throws Exception { + doTestCompaction(); + } + + public static void main(String[] args) throws Exception { + Configuration conf = HBaseConfiguration.create(); + IntegrationTestingUtility.setUseDistributedCluster(conf); + int ret = ToolRunner.run(conf, new IntegrationTestCompactionWithDeviceSimulator(), args); + System.exit(ret); + } + + private void doTestCompaction() throws Exception { + Configuration baseConf = getConf() != null ? getConf() : HBaseConfiguration.create(); + + long totalDataBytes = + baseConf.getLong(CONF_PREFIX + OPT_TOTAL_DATA_BYTES, 1L * 1024 * 1024 * 1024); + int valueSize = baseConf.getInt(CONF_PREFIX + OPT_VALUE_SIZE, 100 * 1024); + int numFlushCycles = baseConf.getInt(CONF_PREFIX + OPT_NUM_FLUSH_CYCLES, 10); + double targetCompressionRatio = + Double.parseDouble(baseConf.get(CONF_PREFIX + OPT_TARGET_COMPRESSION_RATIO, "3.0")); + + String compressionName = baseConf.get(CONF_PREFIX + OPT_COMPRESSION, "SNAPPY"); + String dataBlockEncodingName = baseConf.get(CONF_PREFIX + OPT_DATA_BLOCK_ENCODING, "NONE"); + int blockSize = baseConf.getInt(CONF_PREFIX + OPT_BLOCK_SIZE, 64 * 1024); + String bloomTypeName = baseConf.get(CONF_PREFIX + OPT_BLOOM_TYPE, "ROW"); + String blockPredicatorName = + baseConf.get(CONF_PREFIX + OPT_BLOCK_PREDICATOR, "PreviousBlockCompressionRatePredicator"); + + int constrainedBwMbps = baseConf.getInt(CONF_PREFIX + OPT_CONSTRAINED_BW_MBPS, 25); + int baselineBwMbps = baseConf.getInt(CONF_PREFIX + OPT_BASELINE_BW_MBPS, 250); + int budgetIops = baseConf.getInt(CONF_PREFIX + OPT_BUDGET_IOPS, 100000); + int deviceLatencyUs = baseConf.getInt(CONF_PREFIX + OPT_DEVICE_LATENCY_US, 0); + int volumesPerDataNode = baseConf.getInt(CONF_PREFIX + OPT_VOLUMES_PER_DN, 1); + + Compression.Algorithm compression = Compression.Algorithm.valueOf(compressionName); + DataBlockEncoding dataBlockEncoding = DataBlockEncoding.valueOf(dataBlockEncodingName); + BloomType bloomType = BloomType.valueOf(bloomTypeName); + + Class predicatorClass = + PREDICATOR_CLASSES.get(blockPredicatorName); + if (predicatorClass == null) { + throw new IllegalArgumentException("Unknown blockPredicator '" + blockPredicatorName + + "'. Known values: " + PREDICATOR_CLASSES.keySet()); + } + + LOG.info("========== Test Configuration =========="); + LOG.info(" totalDataBytes={}, valueSize={}, numFlushCycles={}, targetCompressionRatio={}", + totalDataBytes, valueSize, numFlushCycles, targetCompressionRatio); + LOG.info(" compression={}, dataBlockEncoding={}, blockSize={}, bloomType={}", compression, + dataBlockEncoding, blockSize, bloomType); + LOG.info(" blockPredicator={}", predicatorClass.getName()); + LOG.info(" constrainedBwMbps={}, baselineBwMbps={}, budgetIops={}, deviceLatencyUs={}", + constrainedBwMbps, baselineBwMbps, budgetIops, deviceLatencyUs); + LOG.info(" volumesPerDataNode={}", volumesPerDataNode); + LOG.info("========================================"); + + CompactionPerformance constrained = + runCompactionScenario("constrained", constrainedBwMbps, budgetIops, deviceLatencyUs, + volumesPerDataNode, totalDataBytes, valueSize, numFlushCycles, targetCompressionRatio, + compression, dataBlockEncoding, blockSize, bloomType, predicatorClass); + + CompactionPerformance baseline = + runCompactionScenario("baseline", baselineBwMbps, budgetIops, deviceLatencyUs, + volumesPerDataNode, totalDataBytes, valueSize, numFlushCycles, targetCompressionRatio, + compression, dataBlockEncoding, blockSize, bloomType, predicatorClass); + + constrained.log(); + baseline.log(); + double durationRatio = (double) constrained.durationMs / baseline.durationMs; + double bwRatio = (double) baselineBwMbps / constrainedBwMbps; + LOG.info("========== Cross-Scenario Comparison =========="); + LOG.info(" Duration ratio (constrained/baseline): {}", String.format("%.2f", durationRatio)); + LOG.info(" BW ratio (baseline/constrained): {}", String.format("%.2f", bwRatio)); + LOG.info(" Effective throughput: constrained={} MB/s, baseline={} MB/s", + String.format("%.2f", constrained.effectiveThroughputMbps()), + String.format("%.2f", baseline.effectiveThroughputMbps())); + LOG.info("================================================"); + + long constrainedBwSleeps = constrained.bwReadSleepCount + constrained.bwWriteSleepCount; + assertTrue(constrainedBwSleeps > 0, + "Expected BW throttle sleeps in constrained scenario, got 0. " + + "The device simulator did not throttle compaction I/O."); + + assertTrue(constrained.durationMs > baseline.durationMs, + "Expected constrained scenario (" + constrainedBwMbps + " MB/s, " + constrained.durationMs + + " ms) to take longer than baseline (" + baselineBwMbps + " MB/s, " + baseline.durationMs + + " ms)."); + } + + private CompactionPerformance runCompactionScenario(String label, int bwMbps, int budgetIops, + int deviceLatencyUs, int volumesPerDataNode, long totalDataBytes, int valueSize, + int numFlushCycles, double targetCompressionRatio, Compression.Algorithm compression, + DataBlockEncoding dataBlockEncoding, int blockSize, BloomType bloomType, + Class predicatorClass) throws Exception { + + LOG.info(">>> Starting scenario '{}' with device BW = {} MB/s", label, bwMbps); + + Configuration conf = HBaseConfiguration.create(); + conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, + "org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy"); + conf.setBoolean("hbase.tests.use.shortcircuit.reads", false); + conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName()); + conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 100); + conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 200); + conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000); + conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, + NoLimitThroughputController.class.getName()); + conf.set(BlockCompressedSizePredicator.BLOCK_COMPRESSED_SIZE_PREDICATOR, + predicatorClass.getName()); + + EBSDevice.configure(conf, bwMbps, budgetIops, deviceLatencyUs); + + HBaseTestingUtil testUtil = new HBaseTestingUtil(conf); + try { + testUtil.startMiniZKCluster(); + MiniDFSCluster dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(1) + .storagesPerDatanode(volumesPerDataNode).build(); + dfsCluster.waitClusterUp(); + testUtil.setDFSCluster(dfsCluster); + testUtil.startMiniHBaseCluster(1, 1); + + ColumnFamilyDescriptorBuilder cfBuilder = + ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setCompressionType(compression) + .setCompactionCompressionType(compression).setDataBlockEncoding(dataBlockEncoding) + .setBlocksize(blockSize).setBloomFilterType(bloomType); + + Admin admin = testUtil.getAdmin(); + admin.createTable( + TableDescriptorBuilder.newBuilder(TABLE_NAME).setColumnFamily(cfBuilder.build()).build()); + testUtil.waitUntilAllRegionsAssigned(TABLE_NAME); + + long uncompressedDataLoaded = loadData(testUtil, totalDataBytes, valueSize, numFlushCycles, + targetCompressionRatio, admin); + + HStore store = getStore(testUtil); + int storeFilesBefore = store.getStorefilesCount(); + LOG.info(" Scenario '{}': loaded {} MB uncompressed across {} store files", label, + String.format("%.2f", uncompressedDataLoaded / (1024.0 * 1024.0)), storeFilesBefore); + + EBSDevice.resetMetrics(); + + long startTime = EnvironmentEdgeManager.currentTime(); + admin.majorCompact(TABLE_NAME); + waitForCompaction(store); + long durationMs = EnvironmentEdgeManager.currentTime() - startTime; + + CompactionPerformance result = new CompactionPerformance(label, bwMbps, durationMs, + uncompressedDataLoaded, storeFilesBefore); + + LOG.info("<<< Scenario '{}' complete in {} ms", label, durationMs); + return result; + } finally { + EBSDevice.shutdown(); + testUtil.shutdownMiniCluster(); + } + } + + private long loadData(HBaseTestingUtil testUtil, long totalDataBytes, int valueSize, + int numFlushCycles, double targetCompressionRatio, Admin admin) throws IOException { + long bytesPerFlush = totalDataBytes / numFlushCycles; + int rowsPerFlush = Math.max(1, (int) (bytesPerFlush / valueSize)); + long totalUncompressed = 0; + Random rng = new Random(0xCAFEBABE); + int rowCounter = 0; + + try (Table table = testUtil.getConnection().getTable(TABLE_NAME)) { + for (int cycle = 0; cycle < numFlushCycles; cycle++) { + for (int r = 0; r < rowsPerFlush; r++) { + byte[] rowKey = Bytes.toBytes(String.format("row-%010d", rowCounter++)); + byte[] value = generateCompressibleValue(rng, valueSize, targetCompressionRatio); + table.put(new Put(rowKey).addColumn(FAMILY, QUALIFIER, value)); + totalUncompressed += valueSize; + } + admin.flush(TABLE_NAME); + waitForFlush(testUtil); + if ((cycle + 1) % 5 == 0 || cycle == numFlushCycles - 1) { + LOG.info(" Flush cycle {}/{} complete ({} MB loaded so far)", cycle + 1, numFlushCycles, + String.format("%.1f", totalUncompressed / (1024.0 * 1024.0))); + } + } + } + + return totalUncompressed; + } + + /** + * Generate a byte array that compresses at approximately the requested ratio under Snappy. The + * value is split into a random (incompressible) segment and a repeated-byte (highly compressible) + * segment. For a 3:1 target ratio, approximately 1/3 of the bytes are random and 2/3 are a + * constant. + */ + static byte[] generateCompressibleValue(Random rng, int size, double targetCompressionRatio) { + byte[] value = new byte[size]; + int randomBytes = Math.max(1, (int) (size / targetCompressionRatio)); + byte[] randomPart = new byte[randomBytes]; + rng.nextBytes(randomPart); + System.arraycopy(randomPart, 0, value, 0, randomBytes); + byte fill = (byte) 0x42; + for (int i = randomBytes; i < size; i++) { + value[i] = fill; + } + return value; + } + + private HStore getStore(HBaseTestingUtil testUtil) { + SingleProcessHBaseCluster cluster = testUtil.getMiniHBaseCluster(); + List rsts = cluster.getRegionServerThreads(); + for (JVMClusterUtil.RegionServerThread rst : rsts) { + HRegionServer hrs = rst.getRegionServer(); + for (Region region : hrs.getRegions(TABLE_NAME)) { + return ((HRegion) region).getStores().iterator().next(); + } + } + throw new IllegalStateException("No store found for table " + TABLE_NAME); + } + + private void waitForCompaction(HStore store) throws InterruptedException { + long deadline = EnvironmentEdgeManager.currentTime() + 600_000; + while (store.getStorefilesCount() > 1) { + if (EnvironmentEdgeManager.currentTime() > deadline) { + throw new RuntimeException("Compaction did not complete within 10 minutes. Store files: " + + store.getStorefilesCount()); + } + Thread.sleep(500); + } + } + + private void waitForFlush(HBaseTestingUtil testUtil) throws IOException { + long deadline = EnvironmentEdgeManager.currentTime() + 120_000; + while (EnvironmentEdgeManager.currentTime() < deadline) { + long memstoreSize = 0; + for (HRegion region : testUtil.getMiniHBaseCluster().getRegionServer(0) + .getRegions(TABLE_NAME)) { + memstoreSize += region.getMemStoreDataSize(); + } + if (memstoreSize == 0) { + return; + } + try { + Thread.sleep(200); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted waiting for flush", e); + } + } + throw new IOException("Flush did not complete within timeout"); + } + + private static class CompactionPerformance { + final String label; + final int bwMbps; + final long durationMs; + final long bytesRead; + final long bytesWritten; + final long bwReadSleepCount; + final long bwWriteSleepCount; + final long bwReadSleepTimeMs; + final long bwWriteSleepTimeMs; + final long deviceReadOps; + final long deviceWriteOps; + final long appReadOps; + final long appWriteOps; + final long iopsReadSleepCount; + final long iopsWriteSleepCount; + final long uncompressedDataBytes; + final int storeFilesBefore; + + CompactionPerformance(String label, int bwMbps, long durationMs, long uncompressedDataBytes, + int storeFilesBefore) { + this.label = label; + this.bwMbps = bwMbps; + this.durationMs = durationMs; + this.bytesRead = EBSDevice.getTotalBytesRead(); + this.bytesWritten = EBSDevice.getTotalBytesWritten(); + this.bwReadSleepCount = EBSDevice.getReadSleepCount(); + this.bwWriteSleepCount = EBSDevice.getWriteSleepCount(); + this.bwReadSleepTimeMs = EBSDevice.getReadSleepTimeMs(); + this.bwWriteSleepTimeMs = EBSDevice.getWriteSleepTimeMs(); + this.deviceReadOps = EBSDevice.getDeviceReadOps(); + this.deviceWriteOps = EBSDevice.getDeviceWriteOps(); + this.appReadOps = EBSDevice.getReadOpCount(); + this.appWriteOps = EBSDevice.getWriteOpCount(); + this.iopsReadSleepCount = EBSDevice.getIopsReadSleepCount(); + this.iopsWriteSleepCount = EBSDevice.getIopsWriteSleepCount(); + this.uncompressedDataBytes = uncompressedDataBytes; + this.storeFilesBefore = storeFilesBefore; + } + + double effectiveThroughputMbps() { + if (durationMs == 0) { + return Double.MAX_VALUE; + } + return (bytesRead + bytesWritten) / (1024.0 * 1024.0) / (durationMs / 1000.0); + } + + double observedCompressionRatio() { + if (bytesWritten == 0) { + return 1.0; + } + return (double) uncompressedDataBytes / bytesWritten; + } + + void log() { + double throughputMbps = effectiveThroughputMbps(); + LOG.info("=== Scenario: {} (device BW = {} MB/s) ===", label, bwMbps); + LOG.info(" Store files before compaction: {}", storeFilesBefore); + LOG.info(" Compaction duration: {} ms", durationMs); + LOG.info(" Device bytes read: {} ({} MB)", bytesRead, + String.format("%.2f", bytesRead / (1024.0 * 1024.0))); + LOG.info(" Device bytes written: {} ({} MB)", bytesWritten, + String.format("%.2f", bytesWritten / (1024.0 * 1024.0))); + LOG.info(" Effective device throughput: {} MB/s", String.format("%.2f", throughputMbps)); + LOG.info(" Observed compression ratio: {}:1 (uncompressed={} MB, on-disk-written={} MB)", + String.format("%.2f", observedCompressionRatio()), + String.format("%.2f", uncompressedDataBytes / (1024.0 * 1024.0)), + String.format("%.2f", bytesWritten / (1024.0 * 1024.0))); + LOG.info(" BW sleeps: read={}({} ms) write={}({} ms)", bwReadSleepCount, bwReadSleepTimeMs, + bwWriteSleepCount, bwWriteSleepTimeMs); + LOG.info(" App ops: read={} write={} | Device ops: read={} write={}", appReadOps, + appWriteOps, deviceReadOps, deviceWriteOps); + LOG.info(" IOPS sleeps: read={} write={}", iopsReadSleepCount, iopsWriteSleepCount); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/devsim/EBSDevice.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/devsim/EBSDevice.java new file mode 100644 index 000000000000..a713e24fbdc3 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/devsim/EBSDevice.java @@ -0,0 +1,613 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.devsim; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.hadoop.conf.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Static registry and metrics aggregation for the EBS device layer. Each {@link ThrottledFsDataset} + * proxy (one per DataNode) registers itself here at creation time. Provides: + *
    + *
  • Per-DataNode and per-volume metric accessors + *
  • Aggregate accessors across all DataNodes (for simple single-DN tests) + *
  • Periodic metrics reporter (same style as the original ThrottledFileSystem reporter) + *
  • Configuration key constants and a {@link #configure} helper for test setup + *
+ *

+ * Test isolation: This class uses static mutable state. Tests must call {@link #shutdown()} + * in their {@code @AfterClass} method. Parallel test execution in the same JVM is not supported. + */ +public final class EBSDevice { + + private static final Logger LOG = LoggerFactory.getLogger(EBSDevice.class); + + // ---- Configuration keys ---- + + public static final String IO_BUDGET_BYTES_PER_SEC_KEY = "hbase.test.devsim.budget.bytes.per.sec"; + public static final String IO_BUDGET_IOPS_KEY = "hbase.test.devsim.budget.iops"; + public static final String IO_BUDGET_WINDOW_MS_KEY = "hbase.test.devsim.budget.window.ms"; + public static final int DEFAULT_WINDOW_MS = 100; + + public static final String IO_BUDGET_REPORT_INTERVAL_SEC_KEY = + "hbase.test.devsim.budget.report.interval.sec"; + public static final int DEFAULT_REPORT_INTERVAL_SEC = 10; + + public static final String IO_MAX_IO_SIZE_KB_KEY = "hbase.test.devsim.max.iosize.kb"; + public static final int DEFAULT_MAX_IO_SIZE_KB = 1024; + + public static final String IO_INSTANCE_MBPS_KEY = "hbase.test.devsim.instance.mbps"; + public static final int DEFAULT_INSTANCE_MBPS = 0; + + public static final String IO_DEVICE_LATENCY_US_KEY = "hbase.test.devsim.device.latency.us"; + public static final int DEFAULT_DEVICE_LATENCY_US = 1000; + + // ---- Per-DataNode context ---- + + /** + * Holds the EBS volume devices and instance-level budget for a single DataNode. One instance per + * {@link ThrottledFsDataset} proxy. + */ + public static final class DataNodeContext { + private final String datanodeId; + private final EBSVolumeDevice[] volumes; + private final IOBudget instanceBwBudget; + private final long budgetBytesPerSec; + private final int budgetIops; + private final int deviceLatencyUs; + + DataNodeContext(String datanodeId, EBSVolumeDevice[] volumes, IOBudget instanceBwBudget, + long budgetBytesPerSec, int budgetIops, int deviceLatencyUs) { + this.datanodeId = datanodeId; + this.volumes = volumes; + this.instanceBwBudget = instanceBwBudget; + this.budgetBytesPerSec = budgetBytesPerSec; + this.budgetIops = budgetIops; + this.deviceLatencyUs = deviceLatencyUs; + } + + public String getDatanodeId() { + return datanodeId; + } + + public EBSVolumeDevice[] getVolumes() { + return volumes; + } + + public int getNumVolumes() { + return volumes.length; + } + + public IOBudget getInstanceBwBudget() { + return instanceBwBudget; + } + + public long getBudgetBytesPerSec() { + return budgetBytesPerSec; + } + + public int getBudgetIops() { + return budgetIops; + } + + public int getDeviceLatencyUs() { + return deviceLatencyUs; + } + + /** + * Charge bytes against the instance-level BW budget (shared across all volumes on this DN). + * @return ms slept + */ + public long consumeInstanceBw(long bytes) { + if (instanceBwBudget != null) { + return instanceBwBudget.consume(bytes); + } + return 0; + } + + public void reset() { + for (EBSVolumeDevice vol : volumes) { + vol.reset(); + } + if (instanceBwBudget != null) { + instanceBwBudget.reset(); + } + } + } + + // ---- Static registry ---- + + private static final List DATANODES = new CopyOnWriteArrayList<>(); + + private static volatile ScheduledExecutorService metricsExecutor; + private static volatile ScheduledFuture metricsFuture; + private static volatile MetricsReporter metricsReporter; + private static final AtomicLong readInterceptCount = new AtomicLong(); + private static final AtomicLong writeInterceptCount = new AtomicLong(); + private static final AtomicLong unresolvedVolumeCount = new AtomicLong(); + + private EBSDevice() { + } + + /** + * Register a DataNode's EBS context. Called by {@link ThrottledFsDataset} at proxy creation. + */ + public static DataNodeContext register(String datanodeId, EBSVolumeDevice[] volumes, + IOBudget instanceBwBudget, long budgetBytesPerSec, int budgetIops, int deviceLatencyUs) { + DataNodeContext ctx = new DataNodeContext(datanodeId, volumes, instanceBwBudget, + budgetBytesPerSec, budgetIops, deviceLatencyUs); + DATANODES.add(ctx); + LOG.info( + "EBSDevice: registered DN {} with {} volumes " + + "(BW={} MB/s, IOPS={}, latency={}us per volume)", + datanodeId, volumes.length, + budgetBytesPerSec > 0 + ? String.format("%.1f", budgetBytesPerSec / (1024.0 * 1024.0)) + : "unlimited", + budgetIops > 0 ? budgetIops : "unlimited", deviceLatencyUs > 0 ? deviceLatencyUs : "off"); + return ctx; + } + + public static List getDataNodes() { + return DATANODES; + } + + public static DataNodeContext getDataNodeContext(int index) { + return DATANODES.get(index); + } + + public static int getNumDataNodes() { + return DATANODES.size(); + } + + // ---- Aggregate accessors (sum across all DNs and volumes) ---- + + private static long sumVolumes(java.util.function.ToLongFunction fn) { + long total = 0; + for (DataNodeContext dn : DATANODES) { + for (EBSVolumeDevice vol : dn.volumes) { + total += fn.applyAsLong(vol); + } + } + return total; + } + + public static long getTotalBytesRead() { + return sumVolumes(v -> v.totalBytesRead.get()); + } + + public static long getTotalBytesWritten() { + return sumVolumes(v -> v.totalBytesWritten.get()); + } + + public static long getReadOpCount() { + return sumVolumes(v -> v.readOpCount.get()); + } + + public static long getWriteOpCount() { + return sumVolumes(v -> v.writeOpCount.get()); + } + + public static long getDeviceReadOps() { + return sumVolumes(v -> v.volumeDeviceReadOps.get()); + } + + public static long getDeviceWriteOps() { + return sumVolumes(v -> v.volumeDeviceWriteOps.get()); + } + + public static long getReadSleepTimeMs() { + return sumVolumes(v -> v.bwReadSleepTimeMs.get()); + } + + public static long getWriteSleepTimeMs() { + return sumVolumes(v -> v.bwWriteSleepTimeMs.get()); + } + + public static long getReadSleepCount() { + return sumVolumes(v -> v.bwReadSleepCount.get()); + } + + public static long getWriteSleepCount() { + return sumVolumes(v -> v.bwWriteSleepCount.get()); + } + + public static long getIopsReadSleepTimeMs() { + return sumVolumes(v -> v.iopsReadSleepTimeMs.get()); + } + + public static long getIopsWriteSleepTimeMs() { + return sumVolumes(v -> v.iopsWriteSleepTimeMs.get()); + } + + public static long getIopsReadSleepCount() { + return sumVolumes(v -> v.iopsReadSleepCount.get()); + } + + public static long getIopsWriteSleepCount() { + return sumVolumes(v -> v.iopsWriteSleepCount.get()); + } + + public static long getLatencySleepCount() { + return sumVolumes(v -> v.latencySleepCount.get()); + } + + public static long getLatencySleepTimeUs() { + return sumVolumes(v -> v.latencySleepTimeUs.get()); + } + + public static int getDeviceLatencyUs() { + if (!DATANODES.isEmpty()) { + return DATANODES.get(0).deviceLatencyUs; + } + return 0; + } + + public static int getNumVolumes() { + if (!DATANODES.isEmpty()) { + return DATANODES.get(0).volumes.length; + } + return 0; + } + + public static long getBudgetBytesPerSec() { + if (!DATANODES.isEmpty()) { + return DATANODES.get(0).budgetBytesPerSec; + } + return 0; + } + + public static int getBudgetIops() { + if (!DATANODES.isEmpty()) { + return DATANODES.get(0).budgetIops; + } + return 0; + } + + public static String getPerVolumeStats() { + if (DATANODES.isEmpty()) { + return "N/A (no DataNodes registered)"; + } + StringBuilder sb = new StringBuilder(); + for (int d = 0; d < DATANODES.size(); d++) { + DataNodeContext dn = DATANODES.get(d); + if (DATANODES.size() > 1) { + if (d > 0) sb.append("; "); + sb.append("DN").append(d).append("["); + } + for (int i = 0; i < dn.volumes.length; i++) { + if (i > 0) sb.append(", "); + sb.append(String.format("v%d: R=%d W=%d", i, dn.volumes[i].volumeDeviceReadOps.get(), + dn.volumes[i].volumeDeviceWriteOps.get())); + } + if (DATANODES.size() > 1) { + sb.append("]"); + } + } + return sb.toString(); + } + + public static void recordReadIntercept() { + readInterceptCount.incrementAndGet(); + } + + public static void recordWriteIntercept() { + writeInterceptCount.incrementAndGet(); + } + + public static void recordUnresolvedVolume() { + unresolvedVolumeCount.incrementAndGet(); + } + + public static long getReadInterceptCount() { + return readInterceptCount.get(); + } + + public static long getWriteInterceptCount() { + return writeInterceptCount.get(); + } + + public static long getUnresolvedVolumeCount() { + return unresolvedVolumeCount.get(); + } + + // ---- Lifecycle ---- + + public static void resetMetrics() { + MetricsReporter reporter = metricsReporter; + if (reporter != null) { + synchronized (reporter) { + for (DataNodeContext dn : DATANODES) { + dn.reset(); + } + reporter.resetBaseline(); + } + } else { + for (DataNodeContext dn : DATANODES) { + dn.reset(); + } + } + readInterceptCount.set(0); + writeInterceptCount.set(0); + unresolvedVolumeCount.set(0); + } + + public static synchronized void startMetricsReporter(int intervalSec) { + if (metricsExecutor != null) { + return; + } + metricsExecutor = Executors.newSingleThreadScheduledExecutor(r -> { + Thread t = new Thread(r, "EBSDevice-MetricsReporter"); + t.setDaemon(true); + return t; + }); + MetricsReporter reporter = new MetricsReporter(intervalSec); + metricsReporter = reporter; + metricsFuture = + metricsExecutor.scheduleAtFixedRate(reporter, intervalSec, intervalSec, TimeUnit.SECONDS); + } + + public static synchronized void stopMetricsReporter() { + ScheduledFuture f = metricsFuture; + if (f != null) { + f.cancel(false); + metricsFuture = null; + } + ScheduledExecutorService exec = metricsExecutor; + if (exec != null) { + exec.shutdownNow(); + metricsExecutor = null; + } + metricsReporter = null; + } + + /** + * Unregister all DataNodes and stop the metrics reporter. Call from test teardown. + */ + public static void shutdown() { + stopMetricsReporter(); + DATANODES.clear(); + readInterceptCount.set(0); + writeInterceptCount.set(0); + unresolvedVolumeCount.set(0); + } + + // ---- Configuration helper ---- + + /** + * Configure a Hadoop {@link Configuration} for the EBS device layer. Sets the + * {@code dfs.datanode.fsdataset.factory} key and all EBS volume properties. + * @param conf Hadoop configuration to modify + * @param budgetMbps per-volume BW cap in MB/s (0 = unlimited) + * @param budgetIops per-volume IOPS cap (0 = unlimited) + * @param deviceLatencyUs per-IO device latency in microseconds (0 = disabled) + */ + public static void configure(Configuration conf, int budgetMbps, int budgetIops, + int deviceLatencyUs) { + conf.set("dfs.datanode.fsdataset.factory", ThrottledFsDatasetFactory.class.getName()); + if (budgetMbps > 0) { + conf.setLong(IO_BUDGET_BYTES_PER_SEC_KEY, (long) budgetMbps * 1024L * 1024L); + } + if (budgetIops > 0) { + conf.setInt(IO_BUDGET_IOPS_KEY, budgetIops); + } + if (deviceLatencyUs > 0) { + conf.setInt(IO_DEVICE_LATENCY_US_KEY, deviceLatencyUs); + } + } + + /** + * Overload with all knobs. + */ + public static void configure(Configuration conf, int budgetMbps, int budgetIops, + int deviceLatencyUs, int maxIoSizeKb, int instanceMbps) { + configure(conf, budgetMbps, budgetIops, deviceLatencyUs); + conf.setInt(IO_MAX_IO_SIZE_KB_KEY, maxIoSizeKb); + if (instanceMbps > 0) { + conf.setInt(IO_INSTANCE_MBPS_KEY, instanceMbps); + } + } + + // ---- Periodic reporter ---- + + private static class MetricsReporter implements Runnable { + private final int intervalSec; + private long prevBytesRead; + private long prevBytesWritten; + private long prevBwReadSleepMs; + private long prevBwWriteSleepMs; + private long prevBwReadSleepCount; + private long prevBwWriteSleepCount; + private long prevReadOps; + private long prevWriteOps; + private long prevIopsReadSleeps; + private long prevIopsWriteSleeps; + private long prevDeviceReadOps; + private long prevDeviceWriteOps; + private final Map prevPerVolDeviceOps = new HashMap<>(); + + MetricsReporter(int intervalSec) { + this.intervalSec = intervalSec; + } + + synchronized void resetBaseline() { + prevBytesRead = 0; + prevBytesWritten = 0; + prevBwReadSleepMs = 0; + prevBwWriteSleepMs = 0; + prevBwReadSleepCount = 0; + prevBwWriteSleepCount = 0; + prevReadOps = 0; + prevWriteOps = 0; + prevIopsReadSleeps = 0; + prevIopsWriteSleeps = 0; + prevDeviceReadOps = 0; + prevDeviceWriteOps = 0; + prevPerVolDeviceOps.clear(); + } + + @Override + public synchronized void run() { + long curBytesRead = getTotalBytesRead(); + long curBytesWritten = getTotalBytesWritten(); + long curBwReadSleepMs = getReadSleepTimeMs(); + long curBwWriteSleepMs = getWriteSleepTimeMs(); + long curBwReadSleepCount = getReadSleepCount(); + long curBwWriteSleepCount = getWriteSleepCount(); + long curReadOps = getReadOpCount(); + long curWriteOps = getWriteOpCount(); + long curIopsReadSleeps = getIopsReadSleepCount(); + long curIopsWriteSleeps = getIopsWriteSleepCount(); + long curDeviceReadOps = getDeviceReadOps(); + long curDeviceWriteOps = getDeviceWriteOps(); + + long dBytesRead = curBytesRead - prevBytesRead; + long dBytesWritten = curBytesWritten - prevBytesWritten; + long dBwReadSleepMs = curBwReadSleepMs - prevBwReadSleepMs; + long dBwWriteSleepMs = curBwWriteSleepMs - prevBwWriteSleepMs; + long dBwReadSleeps = curBwReadSleepCount - prevBwReadSleepCount; + long dBwWriteSleeps = curBwWriteSleepCount - prevBwWriteSleepCount; + long dReadOps = curReadOps - prevReadOps; + long dWriteOps = curWriteOps - prevWriteOps; + long dIopsReadSleeps = curIopsReadSleeps - prevIopsReadSleeps; + long dIopsWriteSleeps = curIopsWriteSleeps - prevIopsWriteSleeps; + long dDeviceReadOps = curDeviceReadOps - prevDeviceReadOps; + long dDeviceWriteOps = curDeviceWriteOps - prevDeviceWriteOps; + + double readMbps = dBytesRead / (1024.0 * 1024.0) / intervalSec; + double writeMbps = dBytesWritten / (1024.0 * 1024.0) / intervalSec; + double combinedMbps = readMbps + writeMbps; + double dRIops = (double) dDeviceReadOps / intervalSec; + double dWIops = (double) dDeviceWriteOps / intervalSec; + double combinedDeviceIops = dRIops + dWIops; + + int totalVolumes = 0; + long budgetBytesPerSec = 0; + int budgetIops = 0; + int deviceLatencyUs = 0; + if (!DATANODES.isEmpty()) { + DataNodeContext first = DATANODES.get(0); + budgetBytesPerSec = first.budgetBytesPerSec; + budgetIops = first.budgetIops; + deviceLatencyUs = first.deviceLatencyUs; + for (DataNodeContext dn : DATANODES) { + totalVolumes += dn.volumes.length; + } + } + + StringBuilder sb = new StringBuilder(); + sb.append(String.format("EBSDevice [%ds interval, %d DNs, %d vols]: ", intervalSec, + DATANODES.size(), totalVolumes)); + + if (totalVolumes > 0) { + double perVolBwMbps = budgetBytesPerSec > 0 ? budgetBytesPerSec / (1024.0 * 1024.0) : 0; + double aggBwMbps = perVolBwMbps * totalVolumes; + double bwUtil = aggBwMbps > 0 ? (combinedMbps / aggBwMbps) * 100.0 : 0; + int aggIops = totalVolumes * budgetIops; + double iopsUtil = aggIops > 0 ? (combinedDeviceIops / aggIops) * 100.0 : 0; + + if (budgetBytesPerSec > 0) { + sb.append(String.format( + "BW: %dx%.0f=%.0f MB/s, R=%.1f W=%.1f combined=%.1f (%.0f%% util), " + + "BW-sleeps: R=%d(%dms) W=%d(%dms); ", + totalVolumes, perVolBwMbps, aggBwMbps, readMbps, writeMbps, combinedMbps, bwUtil, + dBwReadSleeps, dBwReadSleepMs, dBwWriteSleeps, dBwWriteSleepMs)); + } + if (budgetIops > 0) { + sb.append(String.format( + "IOPS: %dx%d=%d, app-R=%.0f app-W=%.0f dev-R=%.0f dev-W=%.0f " + + "combined-dev=%.0f (%.0f%% util), IOPS-sleeps: R=%d W=%d; ", + totalVolumes, budgetIops, aggIops, (double) dReadOps / intervalSec, + (double) dWriteOps / intervalSec, dRIops, dWIops, combinedDeviceIops, iopsUtil, + dIopsReadSleeps, dIopsWriteSleeps)); + } + + if (budgetIops > 0) { + sb.append("per-vol:["); + boolean first2 = true; + for (int d = 0; d < DATANODES.size(); d++) { + DataNodeContext dn = DATANODES.get(d); + for (int i = 0; i < dn.volumes.length; i++) { + if (!first2) sb.append(' '); + first2 = false; + long volR = dn.volumes[i].volumeDeviceReadOps.get(); + long volW = dn.volumes[i].volumeDeviceWriteOps.get(); + long curVolOps = volR + volW; + String volKey = "d" + d + "v" + i; + long prevVolOps = prevPerVolDeviceOps.getOrDefault(volKey, 0L); + long dVolOps = curVolOps - prevVolOps; + prevPerVolDeviceOps.put(volKey, curVolOps); + double volIops = (double) dVolOps / intervalSec; + double volUtil = budgetIops > 0 ? (volIops / budgetIops) * 100.0 : 0; + if (DATANODES.size() > 1) { + sb.append(String.format("d%dv%d=%.0f%%", d, i, volUtil)); + } else { + sb.append(String.format("v%d=%.0f%%", i, volUtil)); + } + } + } + sb.append("]; "); + } + } + + if (deviceLatencyUs > 0) { + long curLatencySleeps = getLatencySleepCount(); + long curLatencyUs = getLatencySleepTimeUs(); + sb.append( + String.format("lat-sleeps=%d(%.1fs); ", curLatencySleeps, curLatencyUs / 1_000_000.0)); + } + sb.append(String.format("cumulative: R=%s W=%s, R-ops=%d W-ops=%d, dev-R=%d dev-W=%d", + formatBytes(curBytesRead), formatBytes(curBytesWritten), curReadOps, curWriteOps, + curDeviceReadOps, curDeviceWriteOps)); + LOG.info(sb.toString()); + + prevBytesRead = curBytesRead; + prevBytesWritten = curBytesWritten; + prevBwReadSleepMs = curBwReadSleepMs; + prevBwWriteSleepMs = curBwWriteSleepMs; + prevBwReadSleepCount = curBwReadSleepCount; + prevBwWriteSleepCount = curBwWriteSleepCount; + prevReadOps = curReadOps; + prevWriteOps = curWriteOps; + prevIopsReadSleeps = curIopsReadSleeps; + prevIopsWriteSleeps = curIopsWriteSleeps; + prevDeviceReadOps = curDeviceReadOps; + prevDeviceWriteOps = curDeviceWriteOps; + } + } + + static String formatBytes(long bytes) { + if (bytes >= 1024L * 1024L * 1024L) { + return String.format("%.2f GB", bytes / (1024.0 * 1024.0 * 1024.0)); + } else if (bytes >= 1024L * 1024L) { + return String.format("%.2f MB", bytes / (1024.0 * 1024.0)); + } else { + return String.format("%d bytes", bytes); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/devsim/EBSVolumeDevice.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/devsim/EBSVolumeDevice.java new file mode 100644 index 000000000000..9bf4848db31c --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/devsim/EBSVolumeDevice.java @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.devsim; + +import java.util.concurrent.atomic.AtomicLong; + +/** + * Models a single physical EBS volume with independent BW and IOPS budgets and device-level + * coalescing. The synchronized methods naturally serialize IOs to the same volume, which is + * realistic -- a physical disk processes one IO at a time. + *

+ * Read and write coalescing buffers are independent: a write to the same volume does not break read + * coalescing (they are separate physical operations at the device level). However, reads and writes + * share the per-volume IOPS budget. + *

+ * Each instance is fully self-contained with its own counters. Aggregation across volumes and + * DataNodes is handled by {@link EBSDevice}. + */ +public class EBSVolumeDevice { + + final int id; + final IOBudget bwBudget; + final IOBudget iopsBudget; + final int maxIoSizeBytes; + final int deviceLatencyUs; + + private Object lastReader; + private long lastReadEnd = -1; + private long pendingReadBytes; + + private Object lastWriter; + private long pendingWriteBytes; + + // Per-volume counters + final AtomicLong volumeDeviceReadOps = new AtomicLong(); + final AtomicLong volumeDeviceWriteOps = new AtomicLong(); + final AtomicLong totalBytesRead = new AtomicLong(); + final AtomicLong totalBytesWritten = new AtomicLong(); + final AtomicLong readOpCount = new AtomicLong(); + final AtomicLong writeOpCount = new AtomicLong(); + final AtomicLong bwReadSleepTimeMs = new AtomicLong(); + final AtomicLong bwReadSleepCount = new AtomicLong(); + final AtomicLong bwWriteSleepTimeMs = new AtomicLong(); + final AtomicLong bwWriteSleepCount = new AtomicLong(); + final AtomicLong iopsReadSleepTimeMs = new AtomicLong(); + final AtomicLong iopsReadSleepCount = new AtomicLong(); + final AtomicLong iopsWriteSleepTimeMs = new AtomicLong(); + final AtomicLong iopsWriteSleepCount = new AtomicLong(); + final AtomicLong latencySleepCount = new AtomicLong(); + final AtomicLong latencySleepTimeUs = new AtomicLong(); + + /** + * @param id volume index within the DataNode + * @param bwBudget per-volume BW token bucket (null = unlimited) + * @param iopsBudget per-volume IOPS token bucket (null = unlimited) + * @param maxIoSizeBytes max coalesced IO size in bytes (0 = no coalescing) + * @param deviceLatencyUs per-IO device latency in microseconds (0 = disabled) + */ + public EBSVolumeDevice(int id, IOBudget bwBudget, IOBudget iopsBudget, int maxIoSizeBytes, + int deviceLatencyUs) { + this.id = id; + this.bwBudget = bwBudget; + this.iopsBudget = iopsBudget; + this.maxIoSizeBytes = maxIoSizeBytes; + this.deviceLatencyUs = deviceLatencyUs; + } + + public int getId() { + return id; + } + + /** + * Account a read operation against this volume's BW and IOPS budgets. Sequential reads from the + * same stream are coalesced up to {@code maxIoSizeBytes} before consuming an IOPS token. + * Interleaving (a different stream, or a non-sequential position) flushes pending coalesced IO. + */ + public synchronized void accountRead(Object stream, long position, int bytes) { + readOpCount.incrementAndGet(); + totalBytesRead.addAndGet(bytes); + if (iopsBudget != null) { + if (maxIoSizeBytes > 0) { + if (lastReader != stream || (lastReadEnd >= 0 && position != lastReadEnd)) { + flushPendingReadIopsInternal(); + } + lastReader = stream; + pendingReadBytes += bytes; + lastReadEnd = position + bytes; + while (pendingReadBytes >= maxIoSizeBytes) { + pendingReadBytes -= maxIoSizeBytes; + consumeReadIops(); + } + } else { + consumeReadIops(); + } + } + if (bwBudget != null) { + long slept = bwBudget.consume(bytes); + if (slept > 0) { + bwReadSleepTimeMs.addAndGet(slept); + bwReadSleepCount.incrementAndGet(); + } + } + } + + public synchronized void flushPendingReadIops(Object stream) { + if (lastReader == stream) { + flushPendingReadIopsInternal(); + } + } + + private void flushPendingReadIopsInternal() { + if (pendingReadBytes > 0 && iopsBudget != null) { + consumeReadIops(); + } + pendingReadBytes = 0; + lastReader = null; + lastReadEnd = -1; + } + + private void consumeReadIops() { + volumeDeviceReadOps.incrementAndGet(); + long slept = iopsBudget.consume(1); + if (slept > 0) { + iopsReadSleepTimeMs.addAndGet(slept); + iopsReadSleepCount.incrementAndGet(); + } + applyDeviceLatency(); + } + + /** + * Account a write operation against this volume's BW and IOPS budgets. Sequential writes from the + * same stream are coalesced up to {@code maxIoSizeBytes}. + */ + public synchronized void accountWrite(Object stream, int bytes) { + writeOpCount.incrementAndGet(); + totalBytesWritten.addAndGet(bytes); + if (iopsBudget != null) { + if (maxIoSizeBytes > 0) { + if (lastWriter != stream) { + flushPendingWriteIopsInternal(); + } + lastWriter = stream; + pendingWriteBytes += bytes; + while (pendingWriteBytes >= maxIoSizeBytes) { + pendingWriteBytes -= maxIoSizeBytes; + consumeWriteIops(); + } + } else { + consumeWriteIops(); + } + } + if (bwBudget != null) { + long slept = bwBudget.consume(bytes); + if (slept > 0) { + bwWriteSleepTimeMs.addAndGet(slept); + bwWriteSleepCount.incrementAndGet(); + } + } + } + + /** + * Charge a bulk write against this volume's budgets. Used for block-level write throttling at + * finalize time, where the total bytes are known but were not individually intercepted. + * @param totalBytes total bytes written + */ + public synchronized void accountBulkWrite(long totalBytes) { + writeOpCount.incrementAndGet(); + totalBytesWritten.addAndGet(totalBytes); + if (iopsBudget != null) { + int opsToCharge = + maxIoSizeBytes > 0 ? (int) ((totalBytes + maxIoSizeBytes - 1) / maxIoSizeBytes) : 1; + for (int i = 0; i < opsToCharge; i++) { + consumeWriteIops(); + } + } + if (bwBudget != null) { + long slept = bwBudget.consume(totalBytes); + if (slept > 0) { + bwWriteSleepTimeMs.addAndGet(slept); + bwWriteSleepCount.incrementAndGet(); + } + } + } + + public synchronized void flushPendingWriteIops(Object stream) { + if (lastWriter == stream) { + flushPendingWriteIopsInternal(); + } + } + + private void flushPendingWriteIopsInternal() { + if (pendingWriteBytes > 0 && iopsBudget != null) { + consumeWriteIops(); + } + pendingWriteBytes = 0; + lastWriter = null; + } + + private void consumeWriteIops() { + volumeDeviceWriteOps.incrementAndGet(); + long slept = iopsBudget.consume(1); + if (slept > 0) { + iopsWriteSleepTimeMs.addAndGet(slept); + iopsWriteSleepCount.incrementAndGet(); + } + applyDeviceLatency(); + } + + private void applyDeviceLatency() { + if (deviceLatencyUs > 0) { + latencySleepCount.incrementAndGet(); + long startNs = System.nanoTime(); + try { + Thread.sleep(deviceLatencyUs / 1000, (deviceLatencyUs % 1000) * 1000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + latencySleepTimeUs.addAndGet((System.nanoTime() - startNs) / 1000); + } + } + + public synchronized void reset() { + lastReader = null; + lastReadEnd = -1; + pendingReadBytes = 0; + lastWriter = null; + pendingWriteBytes = 0; + volumeDeviceReadOps.set(0); + volumeDeviceWriteOps.set(0); + totalBytesRead.set(0); + totalBytesWritten.set(0); + readOpCount.set(0); + writeOpCount.set(0); + bwReadSleepTimeMs.set(0); + bwReadSleepCount.set(0); + bwWriteSleepTimeMs.set(0); + bwWriteSleepCount.set(0); + iopsReadSleepTimeMs.set(0); + iopsReadSleepCount.set(0); + iopsWriteSleepTimeMs.set(0); + iopsWriteSleepCount.set(0); + latencySleepCount.set(0); + latencySleepTimeUs.set(0); + if (bwBudget != null) { + bwBudget.reset(); + } + if (iopsBudget != null) { + iopsBudget.reset(); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/devsim/IOBudget.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/devsim/IOBudget.java new file mode 100644 index 000000000000..703101fa7932 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/devsim/IOBudget.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.devsim; + +/** + * Token-bucket rate limiter. Supports a configurable tokens-per-second rate with windowed refill. + * Used for per-volume BW budgets, per-volume IOPS budgets, and instance-level aggregate BW caps. + *

+ * When all tokens in the current window are consumed, {@link #consume(long)} blocks until the next + * window opens, providing backpressure to the caller. + */ +public class IOBudget { + + private final long tokensPerSec; + private final long tokensPerWindow; + private final long windowMs; + private final long millisPerToken; + private final boolean lowRateMode; + private long availableTokens; + private long windowStartTime; + private long nextTokenTimeMs; + + /** + * @param tokensPerSec tokens replenished per second (e.g. bytes/sec for BW, ops/sec for IOPS) + * @param windowMs refill window duration in milliseconds + */ + public IOBudget(long tokensPerSec, long windowMs) { + this.tokensPerSec = tokensPerSec; + this.windowMs = windowMs; + this.tokensPerWindow = tokensPerSec * windowMs / 1000; + this.lowRateMode = tokensPerSec > 0 && this.tokensPerWindow == 0; + this.millisPerToken = lowRateMode ? Math.max(1, (1000 + tokensPerSec - 1) / tokensPerSec) : 0; + this.availableTokens = lowRateMode ? 0 : tokensPerWindow; + this.windowStartTime = System.currentTimeMillis(); + this.nextTokenTimeMs = this.windowStartTime; + } + + public synchronized void reset() { + this.availableTokens = lowRateMode ? 0 : tokensPerWindow; + this.windowStartTime = System.currentTimeMillis(); + this.nextTokenTimeMs = this.windowStartTime; + } + + /** + * Consume tokens from the budget. Blocks (sleeps) if the budget is exhausted until enough tokens + * are available. + * @param tokens number of tokens to consume + * @return total milliseconds slept waiting for tokens + */ + public synchronized long consume(long tokens) { + if (tokens <= 0) { + return 0; + } + if (tokensPerSec <= 0) { + return 0; + } + if (lowRateMode) { + return consumeLowRate(tokens); + } + long totalSlept = 0; + long remaining = tokens; + while (remaining > 0) { + long now = System.currentTimeMillis(); + long elapsed = now - windowStartTime; + if (elapsed >= windowMs) { + long windowsPassed = elapsed / windowMs; + availableTokens = tokensPerWindow; + windowStartTime += windowsPassed * windowMs; + } + long toConsume = Math.min(remaining, availableTokens); + if (toConsume > 0) { + availableTokens -= toConsume; + remaining -= toConsume; + } + if (remaining > 0) { + long sleepTime = windowMs - (System.currentTimeMillis() - windowStartTime); + if (sleepTime <= 0) { + sleepTime = 1; + } + totalSlept += sleepTime; + try { + wait(sleepTime); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; + } + } + } + notifyAll(); + return totalSlept; + } + + /** + * Low-rate path for configurations where {@code tokensPerWindow == 0}. In this case we schedule + * one token every {@code millisPerToken} and block callers until the next token time. + */ + private long consumeLowRate(long tokens) { + long totalSlept = 0; + for (long i = 0; i < tokens; i++) { + long now = System.currentTimeMillis(); + if (now < nextTokenTimeMs) { + long sleepTime = nextTokenTimeMs - now; + totalSlept += sleepTime; + try { + wait(sleepTime); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; + } + } else { + nextTokenTimeMs = now; + } + nextTokenTimeMs += millisPerToken; + } + notifyAll(); + return totalSlept; + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/devsim/TestEBSDeviceLayer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/devsim/TestEBSDeviceLayer.java new file mode 100644 index 000000000000..c008b484124a --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/devsim/TestEBSDeviceLayer.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.devsim; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.testclassification.IOTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Integration test verifying that the EBS device layer proxy is correctly installed by + * MiniDFSCluster and intercepts read and write IO at the DataNode storage level. + *

+ * Starts a single-DataNode MiniDFSCluster with 2 storage volumes and the EBS device layer + * configured with high bandwidth/IOPS budgets (so throttling does not slow the test) but with + * device latency disabled. Writes data through HBase, flushes, reads it back via scan and get, and + * asserts that the device layer metrics reflect the IO. + */ +@Category({ IOTests.class, MediumTests.class }) +public class TestEBSDeviceLayer { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestEBSDeviceLayer.class); + + private static final Logger LOG = LoggerFactory.getLogger(TestEBSDeviceLayer.class); + + private static final TableName TABLE_NAME = TableName.valueOf("TestEBSDeviceLayer"); + private static final byte[] FAMILY = Bytes.toBytes("f"); + private static final byte[] QUALIFIER = Bytes.toBytes("q"); + private static final int NUM_ROWS = 200; + private static final int VALUE_SIZE = 4096; + private static final int NUM_VOLUMES = 2; + + private static HBaseTestingUtil UTIL; + + @BeforeClass + public static void setUp() throws Exception { + Configuration conf = HBaseConfiguration.create(); + conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, + "org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy"); + conf.setBoolean("hbase.tests.use.shortcircuit.reads", false); + EBSDevice.configure(conf, 1000, 100000, 0, 1024, 0); + + UTIL = new HBaseTestingUtil(conf); + UTIL.startMiniZKCluster(); + MiniDFSCluster dfsCluster = + new MiniDFSCluster.Builder(conf).numDataNodes(1).storagesPerDatanode(NUM_VOLUMES).build(); + dfsCluster.waitClusterUp(); + UTIL.setDFSCluster(dfsCluster); + UTIL.startMiniCluster(1); + } + + @AfterClass + public static void tearDown() throws Exception { + EBSDevice.shutdown(); + if (UTIL != null) { + UTIL.shutdownMiniCluster(); + } + } + + @Test + public void testDeviceLayerInterceptsIO() throws Exception { + assertEquals("Expected 1 DataNode registered with EBSDevice", 1, EBSDevice.getNumDataNodes()); + EBSDevice.DataNodeContext dnCtx = EBSDevice.getDataNodeContext(0); + assertEquals("Expected " + NUM_VOLUMES + " volumes", NUM_VOLUMES, dnCtx.getNumVolumes()); + + TableDescriptor desc = TableDescriptorBuilder.newBuilder(TABLE_NAME).setColumnFamily( + ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setBlocksize(64 * 1024).build()).build(); + UTIL.getAdmin().createTable(desc); + UTIL.waitUntilAllRegionsAssigned(TABLE_NAME); + + EBSDevice.resetMetrics(); + + byte[] value = new byte[VALUE_SIZE]; + Bytes.random(value); + try (Table table = UTIL.getConnection().getTable(TABLE_NAME)) { + for (int i = 0; i < NUM_ROWS; i++) { + Put put = new Put(Bytes.toBytes(String.format("row-%05d", i))); + put.addColumn(FAMILY, QUALIFIER, value); + table.put(put); + } + } + + UTIL.getAdmin().flush(TABLE_NAME); + waitForFlush(); + + long writeBytesAfterFlush = EBSDevice.getTotalBytesWritten(); + long writeInterceptsAfterFlush = EBSDevice.getWriteInterceptCount(); + LOG.info("After write+flush: bytesWritten={}, writeIntercepts={}, deviceWriteOps={}", + writeBytesAfterFlush, writeInterceptsAfterFlush, EBSDevice.getDeviceWriteOps()); + + assertTrue("Expected write intercepts after flush, got " + writeInterceptsAfterFlush, + writeInterceptsAfterFlush > 0); + assertTrue("Expected bytes written > 0 after flush, got " + writeBytesAfterFlush, + writeBytesAfterFlush > 0); + + EBSDevice.resetMetrics(); + int rowCount = 0; + try (Table table = UTIL.getConnection().getTable(TABLE_NAME)) { + try (ResultScanner scanner = table.getScanner(new Scan())) { + Result result; + while ((result = scanner.next()) != null) { + assertTrue("Row should not be empty", !result.isEmpty()); + rowCount++; + } + } + } + assertEquals("Expected to read back all rows", NUM_ROWS, rowCount); + + long readBytes = EBSDevice.getTotalBytesRead(); + long readIntercepts = EBSDevice.getReadInterceptCount(); + long deviceReadOps = EBSDevice.getDeviceReadOps(); + long readOps = EBSDevice.getReadOpCount(); + LOG.info("After scan: bytesRead={}, readIntercepts={}, appReadOps={}, deviceReadOps={}", + readBytes, readIntercepts, readOps, deviceReadOps); + + assertTrue("Expected read intercepts after scan, got " + readIntercepts, readIntercepts > 0); + assertTrue("Expected bytes read > 0 after scan, got " + readBytes, readBytes > 0); + assertTrue("Expected device read ops > 0 (IOPS coalescing should still produce ops), got " + + deviceReadOps, deviceReadOps > 0); + + EBSDevice.resetMetrics(); + try (Table table = UTIL.getConnection().getTable(TABLE_NAME)) { + Result result = table.get(new Get(Bytes.toBytes("row-00050"))); + assertTrue("Get should return data", !result.isEmpty()); + } + long getReadIntercepts = EBSDevice.getReadInterceptCount(); + LOG.info("After get: readIntercepts={}, bytesRead={}", getReadIntercepts, + EBSDevice.getTotalBytesRead()); + assertTrue("Expected read intercepts after get, got " + getReadIntercepts, + getReadIntercepts > 0); + + long totalIntercepts = EBSDevice.getReadInterceptCount() + EBSDevice.getWriteInterceptCount(); + long unresolved = EBSDevice.getUnresolvedVolumeCount(); + if (totalIntercepts > 0) { + double unresolvedRatio = (double) unresolved / totalIntercepts; + assertTrue("Unresolved volume ratio too high: " + unresolvedRatio + " (unresolved=" + + unresolved + ", total=" + totalIntercepts + ")", unresolvedRatio <= 0.01); + } + + LOG.info("Per-volume stats: {}", EBSDevice.getPerVolumeStats()); + } + + private void waitForFlush() throws Exception { + long deadline = System.currentTimeMillis() + 60000; + while (System.currentTimeMillis() < deadline) { + long memstoreSize = 0; + for (HRegion region : UTIL.getMiniHBaseCluster().getRegionServer(0).getRegions(TABLE_NAME)) { + memstoreSize += region.getMemStoreDataSize(); + } + if (memstoreSize == 0) { + return; + } + Thread.sleep(500); + } + throw new IOException("Flush did not complete within timeout"); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/devsim/TestIOBudget.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/devsim/TestIOBudget.java new file mode 100644 index 000000000000..58f79b0e56a0 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/devsim/TestIOBudget.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.devsim; + +import static org.junit.Assert.assertTrue; + +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.testclassification.IOTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ IOTests.class, SmallTests.class }) +public class TestIOBudget { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestIOBudget.class); + + @Test + public void testLowRateModeDoesNotDeadlock() { + IOBudget budget = new IOBudget(2, 100); + long t0 = System.currentTimeMillis(); + budget.consume(1); + long t1 = System.currentTimeMillis(); + budget.consume(1); + long elapsedSecondTokenMs = System.currentTimeMillis() - t1; + + // At 2 tokens/sec, second token should require waiting roughly 500ms. + assertTrue("Expected low-rate budget to throttle second token, elapsed=" + elapsedSecondTokenMs, + elapsedSecondTokenMs >= 300); + assertTrue("Unexpectedly long low-rate throttle delay, elapsed=" + elapsedSecondTokenMs, + elapsedSecondTokenMs < 3000); + assertTrue("Clock sanity check", t1 >= t0); + } + + @Test + public void testRegularWindowModeStillThrottles() { + // 100 tokens/sec with 100ms windows -> 10 tokens/window + IOBudget budget = new IOBudget(100, 100); + long elapsedMs = budget.consume(15); + assertTrue("Expected consume to sleep when exceeding window budget", elapsedMs > 0); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/devsim/ThrottledBlockInputStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/devsim/ThrottledBlockInputStream.java new file mode 100644 index 000000000000..268fb93f9028 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/devsim/ThrottledBlockInputStream.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.devsim; + +import java.io.IOException; +import java.io.InputStream; + +/** + * Wraps a block data {@link InputStream} returned by the DataNode's {@code getBlockInputStream} + * with per-byte BW and IOPS throttling against the volume's {@link EBSVolumeDevice}. Tracks the + * read position for sequential-IO coalescing detection. + *

+ * Also charges the per-DataNode instance-level BW budget if configured. + */ +public class ThrottledBlockInputStream extends InputStream { + + private final InputStream delegate; + private final EBSVolumeDevice volume; + private final EBSDevice.DataNodeContext dnContext; + private long position; + + /** + * @param delegate the real block input stream from FsDatasetImpl + * @param volume the EBS volume device this block resides on + * @param dnContext the DataNode context for instance-level BW budget (may be null) + * @param offset the initial seek offset into the block + */ + public ThrottledBlockInputStream(InputStream delegate, EBSVolumeDevice volume, + EBSDevice.DataNodeContext dnContext, long offset) { + this.delegate = delegate; + this.volume = volume; + this.dnContext = dnContext; + this.position = offset; + } + + @Override + public int read() throws IOException { + int b = delegate.read(); + if (b >= 0) { + accountRead(1); + } + return b; + } + + @Override + public int read(byte[] buf, int off, int len) throws IOException { + int bytesRead = delegate.read(buf, off, len); + if (bytesRead > 0) { + accountRead(bytesRead); + } + return bytesRead; + } + + @Override + public long skip(long n) throws IOException { + long skipped = delegate.skip(n); + if (skipped > 0) { + volume.flushPendingReadIops(this); + position += skipped; + } + return skipped; + } + + @Override + public int available() throws IOException { + return delegate.available(); + } + + @Override + public void close() throws IOException { + volume.flushPendingReadIops(this); + delegate.close(); + } + + private void accountRead(int bytes) { + volume.accountRead(this, position, bytes); + position += bytes; + if (dnContext != null) { + dnContext.consumeInstanceBw(bytes); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/devsim/ThrottledFsDataset.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/devsim/ThrottledFsDataset.java new file mode 100644 index 000000000000..81478685bb6d --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/devsim/ThrottledFsDataset.java @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.devsim; + +import java.io.InputStream; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Dynamic proxy wrapping a real {@code FsDatasetImpl} to apply EBS volume emulation at the DataNode + * storage level. One proxy is created per DataNode by {@link ThrottledFsDatasetFactory}. + *

+ * Models the Linux ext4 page cache behavior: application writes go to the page cache with no EBS + * charge. EBS is engaged only when dirty pages are flushed to the block device, which happens at + * two interception points on {@code FsDatasetSpi}: + *

    + *
  • {@code getBlockInputStream(ExtendedBlock, long)} -- wraps the returned InputStream with + * {@link ThrottledBlockInputStream} for per-byte read throttling + *
  • {@code submitBackgroundSyncFileRangeRequest(ExtendedBlock, ..., nbytes, ...)} -- charges + * {@code nbytes} against the volume's BW and IOPS budgets, modeling the async writeback of dirty + * pages triggered by {@code sync_file_range(SYNC_FILE_RANGE_WRITE)} every ~8MB + *
  • {@code finalizeBlock(ExtendedBlock, boolean)} -- charges the remaining unflushed bytes (block + * size minus bytes already charged via sync_file_range) against the volume's budgets, modeling the + * {@code fsync()} at block finalization + *
+ * Cumulative synced bytes are tracked per block to avoid double-counting between the intermediate + * sync_file_range charges and the final fsync charge. + *

+ * All other methods are delegated transparently to the inner {@code FsDatasetImpl}. + */ +public final class ThrottledFsDataset { + + private static final Logger LOG = LoggerFactory.getLogger(ThrottledFsDataset.class); + + private ThrottledFsDataset() { + } + + /** + * Create a dynamic proxy wrapping the given {@code FsDatasetSpi} delegate with EBS throttling. + * @param delegate the real FsDatasetImpl + * @param dnId DataNode identifier for metrics registration + * @param conf Hadoop configuration with EBS parameters + * @return a proxy implementing FsDatasetSpi with throttling + */ + @SuppressWarnings("unchecked") + public static > T wrap(FsDatasetSpi delegate, String dnId, + Configuration conf) { + + long bytesPerSec = conf.getLong(EBSDevice.IO_BUDGET_BYTES_PER_SEC_KEY, 0); + int iops = conf.getInt(EBSDevice.IO_BUDGET_IOPS_KEY, 0); + int windowMs = conf.getInt(EBSDevice.IO_BUDGET_WINDOW_MS_KEY, EBSDevice.DEFAULT_WINDOW_MS); + int maxIoKb = conf.getInt(EBSDevice.IO_MAX_IO_SIZE_KB_KEY, EBSDevice.DEFAULT_MAX_IO_SIZE_KB); + int maxIoSizeBytes = maxIoKb * 1024; + int instMbps = conf.getInt(EBSDevice.IO_INSTANCE_MBPS_KEY, EBSDevice.DEFAULT_INSTANCE_MBPS); + int deviceLatencyUs = + conf.getInt(EBSDevice.IO_DEVICE_LATENCY_US_KEY, EBSDevice.DEFAULT_DEVICE_LATENCY_US); + + List realVolumes = new ArrayList<>(); + try (FsDatasetSpi.FsVolumeReferences refs = delegate.getFsVolumeReferences()) { + for (FsVolumeSpi v : refs) { + realVolumes.add(v); + } + } catch (Exception e) { + throw new RuntimeException("Failed to enumerate volumes from delegate dataset", e); + } + + int numVolumes = realVolumes.size(); + EBSVolumeDevice[] ebsVolumes = new EBSVolumeDevice[numVolumes]; + Map storageIdToVolume = new HashMap<>(); + for (int i = 0; i < numVolumes; i++) { + IOBudget volBw = bytesPerSec > 0 ? new IOBudget(bytesPerSec, windowMs) : null; + IOBudget volIops = iops > 0 ? new IOBudget(iops, windowMs) : null; + ebsVolumes[i] = new EBSVolumeDevice(i, volBw, volIops, maxIoSizeBytes, deviceLatencyUs); + String storageId = realVolumes.get(i).getStorageID(); + storageIdToVolume.put(storageId, ebsVolumes[i]); + } + + IOBudget instanceBw = null; + if (instMbps > 0) { + long instBytesPerSec = (long) instMbps * 1024L * 1024L; + instanceBw = new IOBudget(instBytesPerSec, windowMs); + } + + EBSDevice.DataNodeContext dnContext = + EBSDevice.register(dnId, ebsVolumes, instanceBw, bytesPerSec, iops, deviceLatencyUs); + + int reportIntervalSec = conf.getInt(EBSDevice.IO_BUDGET_REPORT_INTERVAL_SEC_KEY, + EBSDevice.DEFAULT_REPORT_INTERVAL_SEC); + EBSDevice.startMetricsReporter(reportIntervalSec); + + LOG.info( + "ThrottledFsDataset: DN {} wrapped with {} EBS volumes " + + "(BW={} MB/s, IOPS={}, coalesce={}KB, latency={}us, instance_cap={} MB/s)", + dnId, numVolumes, + bytesPerSec > 0 ? String.format("%.1f", bytesPerSec / (1024.0 * 1024.0)) : "unlimited", + iops > 0 ? iops : "unlimited", maxIoKb > 0 ? maxIoKb : "off", + deviceLatencyUs > 0 ? deviceLatencyUs : "off", instMbps > 0 ? instMbps : "unlimited"); + + String[] interceptedMethods = + { "getBlockInputStream", "submitBackgroundSyncFileRangeRequest", "finalizeBlock" }; + for (String m : interceptedMethods) { + boolean found = false; + for (Method dm : delegate.getClass().getMethods()) { + if (dm.getName().equals(m)) { + found = true; + break; + } + } + if (!found) { + LOG.warn("EBS interception target method '{}' not found on {}; " + + "throttling for this path will be inactive", m, delegate.getClass().getName()); + } + } + + EBSInvocationHandler handler = new EBSInvocationHandler(delegate, storageIdToVolume, dnContext); + + Set> interfaces = new HashSet<>(); + collectInterfaces(delegate.getClass(), interfaces); + // Ensure FsDatasetSpi is always included + interfaces.add(FsDatasetSpi.class); + + return (T) Proxy.newProxyInstance(delegate.getClass().getClassLoader(), + interfaces.toArray(new Class[0]), handler); + } + + private static void collectInterfaces(Class clazz, Set> result) { + if (clazz == null || clazz == Object.class) { + return; + } + for (Class iface : clazz.getInterfaces()) { + if (result.add(iface)) { + collectInterfaces(iface, result); + } + } + collectInterfaces(clazz.getSuperclass(), result); + } + + private static class EBSInvocationHandler implements InvocationHandler { + + private final FsDatasetSpi delegate; + private final Map storageIdToVolume; + private final EBSDevice.DataNodeContext dnContext; + private final ConcurrentHashMap syncedBytesPerBlock = new ConcurrentHashMap<>(); + + EBSInvocationHandler(FsDatasetSpi delegate, Map storageIdToVolume, + EBSDevice.DataNodeContext dnContext) { + this.delegate = delegate; + this.storageIdToVolume = storageIdToVolume; + this.dnContext = dnContext; + } + + @Override + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + String name = method.getName(); + + // Read path: wrap the block input stream with throttling + if ( + "getBlockInputStream".equals(name) && args != null && args.length >= 2 + && args[0] instanceof ExtendedBlock + ) { + return handleGetBlockInputStream(method, args); + } + + // Write path: intermediate async writeback of dirty pages (sync_file_range) + if ( + "submitBackgroundSyncFileRangeRequest".equals(name) && args != null && args.length >= 4 + && args[0] instanceof ExtendedBlock + ) { + return handleSyncFileRange(method, args); + } + + // Write path: final fsync at block finalization (charge remaining unflushed bytes) + if ( + "finalizeBlock".equals(name) && args != null && args.length >= 1 + && args[0] instanceof ExtendedBlock + ) { + return handleFinalizeBlock(method, args); + } + + // Object methods + if ("toString".equals(name) && (args == null || args.length == 0)) { + return "ThrottledFsDataset[" + delegate.toString() + "]"; + } + if ("hashCode".equals(name) && (args == null || args.length == 0)) { + return System.identityHashCode(proxy); + } + if ("equals".equals(name) && args != null && args.length == 1) { + return proxy == args[0]; + } + + return delegateInvoke(method, args); + } + + private Object handleGetBlockInputStream(Method method, Object[] args) throws Throwable { + ExtendedBlock block = (ExtendedBlock) args[0]; + long offset = (Long) args[1]; + EBSDevice.recordReadIntercept(); + + Object result = delegateInvoke(method, args); + if (result instanceof InputStream) { + EBSVolumeDevice vol = resolveVolume(block); + if (vol != null) { + return new ThrottledBlockInputStream((InputStream) result, vol, dnContext, offset); + } + EBSDevice.recordUnresolvedVolume(); + } + return result; + } + + /** + * Intercepts the DataNode's async dirty page writeback. In production, BlockReceiver calls this + * every ~8MB ({@code CACHE_DROP_LAG_BYTES}) to issue + * {@code sync_file_range(fd, offset, nbytes, SYNC_FILE_RANGE_WRITE)}, which triggers + * asynchronous writeback of dirty pages from the page cache to the EBS device. + *

+ * We charge {@code nbytes} against the volume's BW and IOPS budgets here, and track the + * cumulative synced bytes per block so that {@code finalizeBlock} only charges the remaining + * unflushed delta. + */ + private Object handleSyncFileRange(Method method, Object[] args) throws Throwable { + ExtendedBlock block = (ExtendedBlock) args[0]; + // args: (ExtendedBlock, ReplicaOutputStreams, long offset, long nbytes, int flags) + long nbytes = args[3] instanceof Number ? ((Number) args[3]).longValue() : 0; + EBSDevice.recordWriteIntercept(); + EBSVolumeDevice vol = resolveVolume(block); + if (vol != null && nbytes > 0) { + vol.accountBulkWrite(nbytes); + dnContext.consumeInstanceBw(nbytes); + syncedBytesPerBlock.merge(block.getBlockId(), nbytes, Long::sum); + } else if (vol == null) { + EBSDevice.recordUnresolvedVolume(); + } + return delegateInvoke(method, args); + } + + /** + * Intercepts block finalization, which in the DataNode follows immediately after + * {@code BlockReceiver.close()} (which includes {@code syncDataOut()} / fsync). Charges only + * the remaining bytes not already flushed via {@code sync_file_range} calls. + */ + private Object handleFinalizeBlock(Method method, Object[] args) throws Throwable { + ExtendedBlock block = (ExtendedBlock) args[0]; + EBSDevice.recordWriteIntercept(); + EBSVolumeDevice vol = resolveVolume(block); + if (vol != null) { + long blockBytes = block.getNumBytes(); + long alreadySynced = syncedBytesPerBlock.getOrDefault(block.getBlockId(), 0L); + long remaining = Math.max(0, blockBytes - alreadySynced); + if (remaining > 0) { + vol.accountBulkWrite(remaining); + dnContext.consumeInstanceBw(remaining); + } + syncedBytesPerBlock.remove(block.getBlockId()); + } else { + EBSDevice.recordUnresolvedVolume(); + } + return delegateInvoke(method, args); + } + + private EBSVolumeDevice resolveVolume(ExtendedBlock block) { + try { + FsVolumeSpi vol = delegate.getVolume(block); + if (vol != null) { + return storageIdToVolume.get(vol.getStorageID()); + } + } catch (Exception e) { + LOG.debug("Could not resolve volume for block {}", block, e); + } + return null; + } + + private Object delegateInvoke(Method method, Object[] args) throws Throwable { + try { + return method.invoke(delegate, args); + } catch (InvocationTargetException e) { + throw e.getCause(); + } + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/devsim/ThrottledFsDatasetFactory.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/devsim/ThrottledFsDatasetFactory.java new file mode 100644 index 000000000000..595afa82ccc0 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/devsim/ThrottledFsDatasetFactory.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.devsim; + +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.DataStorage; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Factory that creates {@link ThrottledFsDataset} proxies wrapping the standard + * {@code FsDatasetImpl}. Configured via: + * + *

+ * conf.set("dfs.datanode.fsdataset.factory", ThrottledFsDatasetFactory.class.getName());
+ * 
+ * + * Hadoop calls {@link #newInstance} once per DataNode during startup. Each invocation creates an + * independent proxy with its own set of {@link EBSVolumeDevice} instances. + */ +@SuppressWarnings("rawtypes") +public class ThrottledFsDatasetFactory extends FsDatasetSpi.Factory { + + private static final Logger LOG = LoggerFactory.getLogger(ThrottledFsDatasetFactory.class); + + private static final String DEFAULT_FACTORY_CLASS = + "org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetFactory"; + + @Override + @SuppressWarnings("unchecked") + public FsDatasetSpi newInstance(DataNode datanode, DataStorage storage, Configuration conf) + throws IOException { + FsDatasetSpi.Factory defaultFactory = loadDefaultFactory(); + FsDatasetSpi inner = defaultFactory.newInstance(datanode, storage, conf); + String dnId = datanode.getDatanodeId() != null + ? datanode.getDatanodeId().getDatanodeUuid() + : "DN-" + System.identityHashCode(datanode); + LOG.info("ThrottledFsDatasetFactory: creating EBS device layer for DataNode {}", dnId); + return ThrottledFsDataset.wrap(inner, dnId, conf); + } + + @SuppressWarnings("unchecked") + private static FsDatasetSpi.Factory loadDefaultFactory() throws IOException { + try { + Class clazz = Class.forName(DEFAULT_FACTORY_CLASS); + return (FsDatasetSpi.Factory) clazz.getDeclaredConstructor().newInstance(); + } catch (Exception e) { + throw new IOException("Failed to load default FsDataset factory: " + DEFAULT_FACTORY_CLASS, + e); + } + } +}