Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.replication.reader.ReplicationLogReplayService;
import org.apache.phoenix.schema.CompiledConditionalTTLExpression;
import org.apache.phoenix.schema.CompiledTTLExpression;
import org.apache.phoenix.schema.ConditionalTTLExpression;
Expand Down Expand Up @@ -139,6 +140,7 @@ public class CompactionScanner implements InternalScanner {
private final Store store;
private final RegionCoprocessorEnvironment env;
private long maxLookbackWindowStart;
private final long replicationConsistencyPoint;
private final long maxLookbackInMillis;
private int minVersion;
private int maxVersion;
Expand Down Expand Up @@ -199,8 +201,21 @@ public CompactionScanner(RegionCoprocessorEnvironment env, Store store,
this.maxLookbackWindowStart = this.maxLookbackInMillis == 0
? compactionTime
: compactionTime - (this.maxLookbackInMillis + 1);
ColumnFamilyDescriptor cfd = store.getColumnFamilyDescriptor();
Configuration conf = env.getConfiguration();
this.major = major && !forceMinorCompaction;
boolean replayEnabled =
conf.getBoolean(ReplicationLogReplayService.PHOENIX_REPLICATION_REPLAY_ENABLED,
ReplicationLogReplayService.DEFAULT_REPLICATION_REPLAY_ENABLED);
boolean guardEnabled =
conf.getBoolean(ReplicationLogReplayService.REPLICATION_COMPACTION_GUARD_ENABLED,
ReplicationLogReplayService.DEFAULT_REPLICATION_COMPACTION_GUARD_ENABLED);
if (this.major && replayEnabled && guardEnabled) {
this.replicationConsistencyPoint =
ReplicationLogReplayService.resolveConsistencyPoint(conf, tableName, columnFamilyName);
} else {
this.replicationConsistencyPoint = Long.MAX_VALUE;
}
ColumnFamilyDescriptor cfd = store.getColumnFamilyDescriptor();
this.minVersion = cfd.getMinVersions();
this.maxVersion = cfd.getMaxVersions();
this.keepDeletedCells = keepDeleted ? KeepDeletedCells.TTL : cfd.getKeepDeletedCells();
Expand Down Expand Up @@ -1658,6 +1673,8 @@ private void setTTL(long ttlInSecs) {
this.ttl = Math.max(ttlInSecs * 1000, maxLookbackInMillis + 1);
this.ttlWindowStart = ttlInSecs == HConstants.FOREVER ? 1 : compactionTime - ttl;
this.maxLookbackWindowStartForRow = Math.max(ttlWindowStart, maxLookbackWindowStart);
this.maxLookbackWindowStartForRow =
Math.min(this.maxLookbackWindowStartForRow, replicationConsistencyPoint);
if (LOGGER.isTraceEnabled()) {
LOGGER.trace(String.format("RowContext:- (ttlWindowStart=%d, maxLookbackWindowStart=%d)",
ttlWindowStart, maxLookbackWindowStart));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.phoenix.thirdparty.com.google.common.base.Supplier;
import org.apache.phoenix.thirdparty.com.google.common.base.Suppliers;
import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;

/**
Expand All @@ -50,6 +53,17 @@ public class ReplicationLogReplayService {
*/
public static final boolean DEFAULT_REPLICATION_REPLAY_ENABLED = false;

/**
* Configuration key for enabling/disabling the replication compaction guard
*/
public static final String REPLICATION_COMPACTION_GUARD_ENABLED =
"phoenix.replication.compaction.guard.enabled";
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I commented on this elsewhere, but Claude called this a "foot gun", lol.

I'd argue the flag is a foot-gun: setting phoenix.replication.compaction.guard.enabled=false on a standby silently re-introduces exactly the data-desync bug this PR wants to fix.


/**
* Default value for replication compaction guard enabled flag
*/
public static final boolean DEFAULT_REPLICATION_COMPACTION_GUARD_ENABLED = true;

/**
* Number of threads in the executor pool for the replication replay service
*/
Expand Down Expand Up @@ -77,14 +91,36 @@ public class ReplicationLogReplayService {
*/
public static final int DEFAULT_REPLICATION_REPLAY_SERVICE_EXECUTOR_SHUTDOWN_TIMEOUT_SECONDS = 30;

private static final long CONSISTENCY_POINT_CACHE_TTL_SECONDS = 30;

private static volatile ReplicationLogReplayService instance;

private final Configuration conf;
private ScheduledExecutorService scheduler;
private volatile boolean isRunning = false;
private final Supplier<Long> cachedConsistencyPoint;

private ReplicationLogReplayService(final Configuration conf) {
this.conf = conf;
this.cachedConsistencyPoint = Suppliers.memoizeWithExpiration(() -> {
try {
return getConsistencyPoint();
} catch (Exception e) {
LOG.warn("Failed to refresh cached consistency point", e);
return 0L;
}
}, CONSISTENCY_POINT_CACHE_TTL_SECONDS, TimeUnit.SECONDS);
}

private ReplicationLogReplayService(long fixedConsistencyPoint) {
this.conf = null;
this.cachedConsistencyPoint = () -> fixedConsistencyPoint;
}

private ReplicationLogReplayService(Supplier<Long> supplier) {
this.conf = null;
this.cachedConsistencyPoint = Suppliers.memoizeWithExpiration(
supplier, CONSISTENCY_POINT_CACHE_TTL_SECONDS, TimeUnit.SECONDS);
}

/**
Expand All @@ -105,6 +141,21 @@ public static ReplicationLogReplayService getInstance(Configuration conf) throws
return instance;
}

@VisibleForTesting
public static void setConsistencyPointForTesting(long fixedConsistencyPoint) {
instance = new ReplicationLogReplayService(fixedConsistencyPoint);
}

@VisibleForTesting
public static void setConsistencyPointSupplierForTesting(Supplier<Long> supplier) {
instance = new ReplicationLogReplayService(supplier);
}

@VisibleForTesting
public static void resetInstanceForTesting() {
instance = null;
}
Comment on lines +155 to +157

/**
* Starts the replication log replay service by initializing the scheduler and scheduling periodic
* replay operations for each HA Group.
Expand Down Expand Up @@ -229,6 +280,27 @@ protected long getConsistencyPoint() throws IOException, SQLException {
return consistencyPoint;
}

/**
* Resolves the minimum replication consistency point across all HA groups. Uses a cached value
* with a 30-second TTL to avoid repeated NameNode RPCs during compaction bursts. Returns 0L on
* any failure (caller treats 0 as "retain all delete markers").
*/
public static long resolveConsistencyPoint(Configuration conf, String tableName,
String columnFamilyName) {
try {
long consistencyPoint = getInstance(conf).cachedConsistencyPoint.get();
if (LOG.isDebugEnabled()) {
LOG.debug("Replication guard: table={} store={} consistencyPoint={}", tableName,
columnFamilyName, consistencyPoint);
}
return consistencyPoint;
} catch (Exception e) {
LOG.warn("Replication guard: consistency point unavailable for table={} store={}."
+ " Retaining all delete markers.", tableName, columnFamilyName, e);
return 0L;
}
}

/** Returns the list of HA groups on the cluster */
protected List<String> getReplicationGroups() throws SQLException {
return HAGroupStoreManager.getInstance(conf).getHAGroupNames();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.replication.reader;

import static org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY;
import static org.apache.phoenix.util.TestUtil.assertRawRowCount;
import static org.junit.Assert.assertFalse;

import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Map;
import org.apache.hadoop.hbase.TableName;
import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
import org.apache.phoenix.query.BaseTest;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.ManualEnvironmentEdge;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.TestUtil;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;

import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;

/**
* Integration test verifying that the replication compaction guard does NOT interfere with normal
* compaction when explicitly disabled via configuration.
*/
@Category(NeedsOwnMiniClusterTest.class)
public class CompactionReplicationGuardDisabledIT extends BaseTest {

private static final int MAX_LOOKBACK_AGE = 15;
private static final int ROWS_POPULATED = 2;
private ManualEnvironmentEdge injectEdge;

@BeforeClass
public static synchronized void doSetup() throws Exception {
Map<String, String> props = Maps.newHashMapWithExpectedSize(5);
props.put(PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, Integer.toString(MAX_LOOKBACK_AGE));
props.put(QueryServices.PHOENIX_COMPACTION_ENABLED, Boolean.toString(true));
props.put(ReplicationLogReplayService.PHOENIX_REPLICATION_REPLAY_ENABLED,
Boolean.toString(true));
props.put(ReplicationLogReplayService.REPLICATION_COMPACTION_GUARD_ENABLED,
Boolean.toString(false));
props.put("hbase.procedure.remote.dispatcher.delay.msec", "0");
setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
}

@Before
public void beforeTest() throws Exception {
EnvironmentEdgeManager.reset();
injectEdge = new ManualEnvironmentEdge();
injectEdge.setValue(System.currentTimeMillis());
EnvironmentEdgeManager.injectEdge(injectEdge);
}

@After
public synchronized void afterTest() throws Exception {
ReplicationLogReplayService.resetInstanceForTesting();
EnvironmentEdgeManager.reset();
boolean refCountLeaked = isAnyStoreRefCountLeaked();
assertFalse("refCount leaked", refCountLeaked);
}

/**
* When guard is disabled, delete markers are purged normally by maxLookback even though the
* consistency point would have protected them if the guard were enabled.
*/
@Test(timeout = 120000L)
public void testGuardDisabledDeleteMarkersPurgedByMaxLookback() throws Exception {
try (Connection conn = DriverManager.getConnection(getUrl())) {
String dataTableName = generateUniqueName();
createTable(dataTableName);
TableName dataTable = TableName.valueOf(dataTableName);
populateTable(dataTableName);

injectEdge.incrementValue(1);
long beforeDeleteTime = EnvironmentEdgeManager.currentTimeMillis();

// Delete a row
conn.createStatement().execute("DELETE FROM " + dataTableName + " WHERE id = 'a'");
conn.commit();
injectEdge.incrementValue(1);

// Set consistency point BEFORE delete — guard would retain if enabled
long consistencyPoint = beforeDeleteTime - 1;
ReplicationLogReplayService.setConsistencyPointForTesting(consistencyPoint);

// Advance past maxLookback
injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000 + 1000);

flush(dataTable);
majorCompact(dataTable);

// Guard disabled — delete marker purged by maxLookback as normal
assertRawRowCount(conn, dataTable, ROWS_POPULATED - 1);
}
}

private void flush(TableName table) throws IOException {
getUtility().getAdmin().flush(table);
}

private void majorCompact(TableName table) throws Exception {
TestUtil.majorCompact(getUtility(), table);
}

private void createTable(String tableName) throws SQLException {
try (Connection conn = DriverManager.getConnection(getUrl())) {
conn.createStatement().execute(
"CREATE TABLE " + tableName + " (id VARCHAR(10) NOT NULL PRIMARY KEY, val1 VARCHAR(10),"
+ " val2 VARCHAR(10), val3 VARCHAR(10))");
conn.commit();
}
}

private void populateTable(String tableName) throws SQLException {
try (Connection conn = DriverManager.getConnection(getUrl())) {
conn.createStatement()
.execute("UPSERT INTO " + tableName + " VALUES ('a', 'ab', 'abc', 'abcd')");
conn.commit();
conn.createStatement()
.execute("UPSERT INTO " + tableName + " VALUES ('b', 'bc', 'bcd', 'bcde')");
conn.commit();
}
}
}
Loading