diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java index adf0ef6e510..e234c3a6752 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java @@ -31,6 +31,7 @@ import java.nio.ByteBuffer; import java.util.ArrayDeque; import java.util.Deque; +import java.util.HashSet; import java.util.Map; import java.util.Map.Entry; import java.util.Set; @@ -54,7 +55,9 @@ import org.apache.zookeeper.server.Request; import org.apache.zookeeper.server.ServerCnxn; import org.apache.zookeeper.server.ServerMetrics; +import org.apache.zookeeper.server.SessionTracker; import org.apache.zookeeper.server.TxnLogEntry; +import org.apache.zookeeper.server.ZKDatabase; import org.apache.zookeeper.server.ZooTrace; import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer; import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier; @@ -579,6 +582,7 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { boolean snapshotNeeded = true; boolean syncSnapshot = false; readPacket(qp); + boolean diffSync = qp.getType() == Leader.DIFF; Deque packetsCommitted = new ArrayDeque<>(); Deque packetsNotLogged = new ArrayDeque<>(); @@ -633,6 +637,10 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { } zk.getZKDatabase().initConfigInZKDatabase(self.getQuorumVerifier()); zk.createSessionTracker(); + // DIFF keeps the local tree; clear ephemerals without sessions before applying new transactions. + if (diffSync) { + purgeOrphanedEphemerals(); + } // TODO: Ideally, this should be lastProcessZxid(a.k.a. QuorumPacket::zxid from above), but currently // LearnerHandler does not guarantee this. So, let's be conservative and keep it unchange for now. @@ -869,6 +877,43 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { // New server type need to handle in-flight packets throw new UnsupportedOperationException("Unknown server type"); } + + } + + void purgeOrphanedEphemerals() { + if (zk == null) { + return; + } + SessionTracker sessionTracker = zk.getSessionTracker(); + if (sessionTracker == null) { + return; + } + ZKDatabase zkDatabase = zk.getZKDatabase(); + if (zkDatabase == null) { + return; + } + + Set globalSessions = sessionTracker.globalSessions(); + Set localSessions = sessionTracker.localSessions(); + Set sessionsWithEphemerals = new HashSet<>(zkDatabase.getSessions()); + if (sessionsWithEphemerals.isEmpty()) { + return; + } + + long zxid = zkDatabase.getDataTreeLastProcessedZxid(); + for (Long sessionId : sessionsWithEphemerals) { + if (globalSessions.contains(sessionId) + || localSessions.contains(sessionId) + || (sessionTracker instanceof UpgradeableSessionTracker + && ((UpgradeableSessionTracker) sessionTracker).isUpgradingSession(sessionId))) { + continue; + } + LOG.warn( + "Removing ephemeral nodes for unknown session 0x{} after DIFF sync", + Long.toHexString(sessionId)); + zkDatabase.killSession(sessionId, zxid); + sessionTracker.removeSession(sessionId); + } } protected void revalidate(QuorumPacket qp) throws IOException { diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerTest.java index d64d051b093..a79b3d7545e 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerTest.java @@ -24,6 +24,8 @@ import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; @@ -50,6 +52,7 @@ import org.apache.zookeeper.server.ExitCode; import org.apache.zookeeper.server.ZKDatabase; import org.apache.zookeeper.server.persistence.FileTxnSnapLog; +import org.apache.zookeeper.test.TestUtils; import org.apache.zookeeper.txn.CreateTxn; import org.apache.zookeeper.txn.TxnHeader; import org.apache.zookeeper.util.ServiceUtils; @@ -313,6 +316,39 @@ public void syncTest(@TempDir File tmpDir) throws Exception { assertEquals(startZxid, sl.zk.getLastProcessedZxid()); } + @Test + public void testPurgeOrphanedEphemerals() throws Exception { + File tmpFile = File.createTempFile("test", ".dir", testData); + tmpFile.delete(); + SimpleLearner sl = null; + try { + FileTxnSnapLog ftsl = new FileTxnSnapLog(tmpFile, tmpFile); + sl = new SimpleLearner(ftsl); + + long sessionId = 0x1234L; + TxnHeader hdr = new TxnHeader(sessionId, 1, 1L, 1L, ZooDefs.OpCode.create); + CreateTxn txn = new CreateTxn( + "/eph", + new byte[0], + new ArrayList(), + true, + sl.zk.getZKDatabase().getNode("/").stat.getCversion()); + sl.zk.getZKDatabase().processTxn(hdr, txn, null); + + assertNotNull(sl.zk.getZKDatabase().getNode("/eph"), "Ephemeral node should exist before cleanup"); + + sl.zk.createSessionTracker(); + sl.purgeOrphanedEphemerals(); + + assertNull(sl.zk.getZKDatabase().getNode("/eph"), "Ephemeral node should be removed for unknown session"); + } finally { + if (sl != null) { + sl.zk.shutdown(); + } + TestUtils.deleteFileRecursively(tmpFile); + } + } + @Test public void truncFailTest(@TempDir File tmpDir) throws Exception { final boolean[] exitProcCalled = {false};