Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,14 @@ public class MetricNames {
public static final String SERVER_PHYSICAL_STORAGE_LOCAL_SIZE = "localSize";
public static final String SERVER_PHYSICAL_STORAGE_REMOTE_LOG_SIZE = "remoteLogSize";

/**
* Total physical storage size of standby replica snapshots on this tablet server.
*
* <p>Standby replicas maintain downloaded KV snapshots for fast leader failover. This metric
* tracks the total size of all KV snapshot data downloaded by standby replicas on this server.
*/
public static final String SERVER_PHYSICAL_STORAGE_STANDBY_SIZE = "standbySize";

// --------------------------------------------------------------------------------------------
// metrics for user
// --------------------------------------------------------------------------------------------
Expand Down
4 changes: 4 additions & 0 deletions fluss-rpc/src/main/proto/FlussApi.proto
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,7 @@ message NotifyKvSnapshotOffsetRequest {
required int32 bucket_id = 3;
required int32 coordinator_epoch = 4;
required int64 min_retain_offset = 5;
optional int64 snapshot_id = 6;
}

message NotifyKvSnapshotOffsetResponse {
Expand Down Expand Up @@ -900,6 +901,7 @@ message PbAdjustIsrReqForBucket {
repeated int32 new_isr = 4 [packed = true];
required int32 coordinator_epoch = 5;
required int32 bucket_epoch = 6;
repeated int32 standby_replicas = 7 [packed = true];
}

message PbAdjustIsrRespForTable {
Expand All @@ -917,6 +919,7 @@ message PbAdjustIsrRespForBucket {
repeated int32 isr = 7 [packed = true];
optional int32 bucket_epoch = 8;
optional int32 coordinator_epoch = 9;
repeated int32 standby_replicas = 10 [packed = true];
}

message PbListOffsetsRespForBucket {
Expand All @@ -937,6 +940,7 @@ message PbNotifyLeaderAndIsrReqForBucket {
repeated int32 replicas = 5 [packed = true];
repeated int32 isr = 6 [packed = true];
required int32 bucket_epoch = 7;
repeated int32 standby_replicas = 8 [packed = true];
}

message PbNotifyLeaderAndIsrRespForBucket {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ protected int runInThreadPool(Runnable[] runnableJobs, String poolName) throws T
* @param tableBucket the table bucket
* @return the tablet directory
*/
protected File getOrCreateTabletDir(PhysicalTablePath tablePath, TableBucket tableBucket) {
public File getOrCreateTabletDir(PhysicalTablePath tablePath, TableBucket tableBucket) {
File tabletDir = getTabletDir(tablePath, tableBucket);
if (tabletDir.exists()) {
return tabletDir;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1618,6 +1618,7 @@ private List<AdjustIsrResultForBucket> tryProcessAdjustIsr(
// TODO: reject the request if there is a replica in ISR is not online,
// see KIP-841.
tryAdjustLeaderAndIsr.isr(),
tryAdjustLeaderAndIsr.standbyReplicas(),
coordinatorContext.getCoordinatorEpoch(),
currentLeaderAndIsr.bucketEpoch() + 1);
newLeaderAndIsrList.put(tableBucket, newLeaderAndIsr);
Expand Down Expand Up @@ -1740,7 +1741,9 @@ private void tryProcessCommitKvSnapshot(
completedSnapshotStore.add(completedSnapshot);
coordinatorEventManager.put(
new NotifyKvSnapshotOffsetEvent(
tb, completedSnapshot.getLogOffset()));
tb,
completedSnapshot.getLogOffset(),
completedSnapshot.getSnapshotID()));
callback.complete(new CommitKvSnapshotResponse());
} catch (Exception e) {
callback.completeExceptionally(e);
Expand All @@ -1750,7 +1753,6 @@ private void tryProcessCommitKvSnapshot(

private void processNotifyKvSnapshotOffsetEvent(NotifyKvSnapshotOffsetEvent event) {
TableBucket tb = event.getTableBucket();
long logOffset = event.getLogOffset();
coordinatorRequestBatch.newBatch();
coordinatorContext
.getBucketLeaderAndIsr(tb)
Expand All @@ -1761,7 +1763,8 @@ private void processNotifyKvSnapshotOffsetEvent(NotifyKvSnapshotOffsetEvent even
coordinatorContext.getFollowers(
tb, leaderAndIsr.leader()),
tb,
logOffset));
event.getLogOffset(),
event.getSnapshotId()));
coordinatorRequestBatch.sendNotifyKvSnapshotOffsetRequest(
coordinatorContext.getCoordinatorEpoch());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,15 +372,18 @@ public void addNotifyRemoteLogOffsetsRequestForTabletServers(
}

public void addNotifyKvSnapshotOffsetRequestForTabletServers(
List<Integer> tabletServers, TableBucket tableBucket, long minRetainOffset) {
List<Integer> tabletServers,
TableBucket tableBucket,
long minRetainOffset,
long snapshotId) {
tabletServers.stream()
.filter(s -> s >= 0)
.forEach(
id ->
notifyKvSnapshotOffsetRequestMap.put(
id,
makeNotifyKvSnapshotOffsetRequest(
tableBucket, minRetainOffset)));
tableBucket, minRetainOffset, snapshotId)));
}

public void addNotifyLakeTableOffsetRequestForTableServers(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ public class NotifyKvSnapshotOffsetEvent implements CoordinatorEvent {

private final TableBucket tableBucket;
private final long logOffset;
private final long snapshotId;

public NotifyKvSnapshotOffsetEvent(TableBucket tableBucket, long logOffset) {
public NotifyKvSnapshotOffsetEvent(TableBucket tableBucket, long logOffset, long snapshotId) {
this.tableBucket = tableBucket;
this.logOffset = logOffset;
this.snapshotId = snapshotId;
}

public TableBucket getTableBucket() {
Expand All @@ -38,4 +40,8 @@ public TableBucket getTableBucket() {
public long getLogOffset() {
return logOffset;
}

public long getSnapshotId() {
return snapshotId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.fluss.server.zk.data.LeaderAndIsr;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
Expand All @@ -41,23 +42,25 @@ public static class DefaultLeaderElection extends ReplicaLeaderElection {
* @param assignments the assignments
* @param aliveReplicas the alive replicas
* @param leaderAndIsr the original leaderAndIsr
* @param isPrimaryKeyTable whether this table bucket is primary key table
* @return the election result
*/
public Optional<ElectionResult> leaderElection(
List<Integer> assignments, List<Integer> aliveReplicas, LeaderAndIsr leaderAndIsr) {
// currently, we always use the first replica in assignment, which also in aliveReplicas
// and
// isr as the leader replica.
List<Integer> assignments,
List<Integer> aliveReplicas,
LeaderAndIsr leaderAndIsr,
boolean isPrimaryKeyTable) {
List<Integer> isr = leaderAndIsr.isr();
for (int assignment : assignments) {
if (aliveReplicas.contains(assignment) && isr.contains(assignment)) {
return Optional.of(
new TableBucketStateMachine.ElectionResult(
aliveReplicas, leaderAndIsr.newLeaderAndIsr(assignment, isr)));
}
}

return Optional.empty();
// First we will filter out the assignment list to only contain the alive replicas and
// isr.
List<Integer> availableReplicas =
assignments.stream()
.filter(
replica ->
aliveReplicas.contains(replica)
&& isr.contains(replica))
.collect(Collectors.toList());
return electLeader(availableReplicas, aliveReplicas, leaderAndIsr, isPrimaryKeyTable);
}
}

Expand All @@ -70,32 +73,66 @@ public static class ControlledShutdownLeaderElection extends ReplicaLeaderElecti
* @param aliveReplicas the alive replicas
* @param leaderAndIsr the original leaderAndIsr
* @param shutdownTabletServers the shutdown tabletServers
* @param isPrimaryKeyTable whether this table bucket is primary key table
* @return the election result
*/
public Optional<ElectionResult> leaderElection(
List<Integer> assignments,
List<Integer> aliveReplicas,
LeaderAndIsr leaderAndIsr,
Set<Integer> shutdownTabletServers) {
Set<Integer> shutdownTabletServers,
boolean isPrimaryKeyTable) {
List<Integer> originIsr = leaderAndIsr.isr();
Set<Integer> isrSet = new HashSet<>(originIsr);
for (Integer id : assignments) {
if (aliveReplicas.contains(id)
&& isrSet.contains(id)
&& !shutdownTabletServers.contains(id)) {
Set<Integer> newAliveReplicas = new HashSet<>(aliveReplicas);
newAliveReplicas.removeAll(shutdownTabletServers);
List<Integer> newIsr =
originIsr.stream()
.filter(replica -> !shutdownTabletServers.contains(replica))
.collect(Collectors.toList());
return Optional.of(
new ElectionResult(
new ArrayList<>(newAliveReplicas),
leaderAndIsr.newLeaderAndIsr(id, newIsr)));
}
// Filter out available replicas: alive, in ISR, and not shutting down.
List<Integer> availableReplicas =
assignments.stream()
.filter(
replica ->
aliveReplicas.contains(replica)
&& isrSet.contains(replica)
&& !shutdownTabletServers.contains(replica))
.collect(Collectors.toList());

if (availableReplicas.isEmpty()) {
return Optional.empty();
}
return Optional.empty();

// For log table, simply use the first available replica as leader.
if (!isPrimaryKeyTable) {
List<Integer> newIsr =
originIsr.stream()
.filter(replica -> !shutdownTabletServers.contains(replica))
.collect(Collectors.toList());
Set<Integer> newAliveReplicas = new HashSet<>(aliveReplicas);
newAliveReplicas.removeAll(shutdownTabletServers);
return Optional.of(
new ElectionResult(
new ArrayList<>(newAliveReplicas),
leaderAndIsr.newLeaderAndIsr(
availableReplicas.get(0),
newIsr,
Collections.emptyList())));
}

// For PK table, elect leader and standby.
LeaderAndStandby leaderAndStandby =
electLeaderAndStandbyForPkTable(availableReplicas, leaderAndIsr);

Set<Integer> newAliveReplicas = new HashSet<>(aliveReplicas);
newAliveReplicas.removeAll(shutdownTabletServers);
List<Integer> newIsr =
originIsr.stream()
.filter(replica -> !shutdownTabletServers.contains(replica))
.collect(Collectors.toList());

return Optional.of(
new ElectionResult(
new ArrayList<>(newAliveReplicas),
leaderAndIsr.newLeaderAndIsr(
leaderAndStandby.leader,
newIsr,
leaderAndStandby.standbyReplicas)));
}
}

Expand All @@ -108,20 +145,109 @@ public ReassignmentLeaderElection(List<Integer> newReplicas) {
}

public Optional<ElectionResult> leaderElection(
List<Integer> liveReplicas, LeaderAndIsr leaderAndIsr) {
// currently, we always use the first replica in targetReplicas, which also in
List<Integer> liveReplicas, LeaderAndIsr leaderAndIsr, boolean isPrimaryKeyTable) {
// Currently, we always use the first replica in targetReplicas, which also in
// liveReplicas and isr as the leader replica. For bucket reassignment, the first
// replica is the target leader replica.
List<Integer> isr = leaderAndIsr.isr();
for (int assignment : newReplicas) {
if (liveReplicas.contains(assignment) && isr.contains(assignment)) {
return Optional.of(
new ElectionResult(
liveReplicas, leaderAndIsr.newLeaderAndIsr(assignment, isr)));
}
}
List<Integer> availableReplicas =
newReplicas.stream()
.filter(
replica ->
liveReplicas.contains(replica) && isr.contains(replica))
.collect(Collectors.toList());
return electLeader(availableReplicas, liveReplicas, leaderAndIsr, isPrimaryKeyTable);
}
}

// ------------------------------------------------------------------------
// Common election logic
// ------------------------------------------------------------------------

private static Optional<ElectionResult> electLeader(
List<Integer> availableReplicas,
List<Integer> aliveReplicas,
LeaderAndIsr leaderAndIsr,
boolean isPrimaryKeyTable) {
if (availableReplicas.isEmpty()) {
return Optional.empty();
}

List<Integer> isr = leaderAndIsr.isr();

// For log table, simply use the first available replica as leader.
if (!isPrimaryKeyTable) {
return Optional.of(
new ElectionResult(
aliveReplicas,
leaderAndIsr.newLeaderAndIsr(
availableReplicas.get(0), isr, Collections.emptyList())));
}

// For PK table, elect leader and standby.
LeaderAndStandby leaderAndStandby =
electLeaderAndStandbyForPkTable(availableReplicas, leaderAndIsr);
return Optional.of(
new ElectionResult(
aliveReplicas,
leaderAndIsr.newLeaderAndIsr(
leaderAndStandby.leader, isr, leaderAndStandby.standbyReplicas)));
}

/**
* Elect leader and standby for PK table.
*
* <p>Election strategy:
*
* <ul>
* <li>If current standby exists and is available, promote it to leader
* <li>Otherwise, use the first available replica as leader
* <li>Select new standby from remaining available replicas (if any)
* </ul>
*/
private static LeaderAndStandby electLeaderAndStandbyForPkTable(
List<Integer> availableReplicas, LeaderAndIsr leaderAndIsr) {
int currentStandby = getCurrentStandby(leaderAndIsr);
int newLeader;

if (currentStandby != -1 && availableReplicas.contains(currentStandby)) {
// Promote current standby to leader.
newLeader = currentStandby;
} else {
// Use first available replica as leader.
newLeader = availableReplicas.get(0);
}

// Find new standby from remaining replicas.
List<Integer> standbyReplicas = findNewStandby(availableReplicas, newLeader);
return new LeaderAndStandby(newLeader, standbyReplicas);
}

/** Get current standby replica ID, returns -1 if no standby exists. */
private static int getCurrentStandby(LeaderAndIsr leaderAndIsr) {
return leaderAndIsr.standbyReplicas().isEmpty()
? -1
: leaderAndIsr.standbyReplicas().get(0);
}

/** Find new standby from available replicas, excluding the leader. */
private static List<Integer> findNewStandby(
List<Integer> availableReplicas, int excludeLeader) {
return availableReplicas.stream()
.filter(replica -> replica != excludeLeader)
.findFirst()
.map(Collections::singletonList)
.orElse(Collections.emptyList());
}

/** Internal class to hold leader and standby election result. */
private static class LeaderAndStandby {
final int leader;
final List<Integer> standbyReplicas;

LeaderAndStandby(int leader, List<Integer> standbyReplicas) {
this.leader = leader;
this.standbyReplicas = standbyReplicas;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,8 @@ private Map<TableBucketReplica, LeaderAndIsr> doRemoveReplicaFromIsr(
.collect(Collectors.toList());
LeaderAndIsr adjustLeaderAndIsr =
newLeader == LeaderAndIsr.NO_LEADER
? leaderAndIsr.newLeaderAndIsr(newLeader, newIsr)
? leaderAndIsr.newLeaderAndIsr(
newLeader, newIsr, leaderAndIsr.standbyReplicas())
: leaderAndIsr.newLeaderAndIsr(newIsr);
adjustedLeaderAndIsr.put(tableBucketReplica, adjustLeaderAndIsr);
toUpdateLeaderAndIsrList.put(tableBucket, adjustLeaderAndIsr);
Expand Down
Loading