Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions fluss-rpc/src/main/proto/FlussApi.proto
Original file line number Diff line number Diff line change
Expand Up @@ -900,6 +900,7 @@ message PbAdjustIsrReqForBucket {
repeated int32 new_isr = 4 [packed = true];
required int32 coordinator_epoch = 5;
required int32 bucket_epoch = 6;
repeated int32 standby_replicas = 7 [packed = true];
}

message PbAdjustIsrRespForTable {
Expand All @@ -917,6 +918,7 @@ message PbAdjustIsrRespForBucket {
repeated int32 isr = 7 [packed = true];
optional int32 bucket_epoch = 8;
optional int32 coordinator_epoch = 9;
repeated int32 standby_replicas = 10 [packed = true];
}

message PbListOffsetsRespForBucket {
Expand All @@ -937,6 +939,7 @@ message PbNotifyLeaderAndIsrReqForBucket {
repeated int32 replicas = 5 [packed = true];
repeated int32 isr = 6 [packed = true];
required int32 bucket_epoch = 7;
repeated int32 standby_replicas = 8 [packed = true];
}

message PbNotifyLeaderAndIsrRespForBucket {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1618,6 +1618,7 @@ private List<AdjustIsrResultForBucket> tryProcessAdjustIsr(
// TODO: reject the request if there is a replica in ISR is not online,
// see KIP-841.
tryAdjustLeaderAndIsr.isr(),
tryAdjustLeaderAndIsr.standbyReplicas(),
coordinatorContext.getCoordinatorEpoch(),
currentLeaderAndIsr.bucketEpoch() + 1);
newLeaderAndIsrList.put(tableBucket, newLeaderAndIsr);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.fluss.server.zk.data.LeaderAndIsr;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
Expand All @@ -41,23 +42,25 @@ public static class DefaultLeaderElection extends ReplicaLeaderElection {
* @param assignments the assignments
* @param aliveReplicas the alive replicas
* @param leaderAndIsr the original leaderAndIsr
* @param isPrimaryKeyTable whether this table bucket is primary key table
* @return the election result
*/
public Optional<ElectionResult> leaderElection(
List<Integer> assignments, List<Integer> aliveReplicas, LeaderAndIsr leaderAndIsr) {
// currently, we always use the first replica in assignment, which also in aliveReplicas
// and
// isr as the leader replica.
List<Integer> assignments,
List<Integer> aliveReplicas,
LeaderAndIsr leaderAndIsr,
boolean isPrimaryKeyTable) {
List<Integer> isr = leaderAndIsr.isr();
for (int assignment : assignments) {
if (aliveReplicas.contains(assignment) && isr.contains(assignment)) {
return Optional.of(
new TableBucketStateMachine.ElectionResult(
aliveReplicas, leaderAndIsr.newLeaderAndIsr(assignment, isr)));
}
}

return Optional.empty();
// First we will filter out the assignment list to only contain the alive replicas and
// isr.
List<Integer> availableReplicas =
assignments.stream()
.filter(
replica ->
aliveReplicas.contains(replica)
&& isr.contains(replica))
.collect(Collectors.toList());
return electLeader(availableReplicas, aliveReplicas, leaderAndIsr, isPrimaryKeyTable);
}
}

Expand All @@ -70,32 +73,66 @@ public static class ControlledShutdownLeaderElection extends ReplicaLeaderElecti
* @param aliveReplicas the alive replicas
* @param leaderAndIsr the original leaderAndIsr
* @param shutdownTabletServers the shutdown tabletServers
* @param isPrimaryKeyTable whether this table bucket is primary key table
* @return the election result
*/
public Optional<ElectionResult> leaderElection(
List<Integer> assignments,
List<Integer> aliveReplicas,
LeaderAndIsr leaderAndIsr,
Set<Integer> shutdownTabletServers) {
Set<Integer> shutdownTabletServers,
boolean isPrimaryKeyTable) {
List<Integer> originIsr = leaderAndIsr.isr();
Set<Integer> isrSet = new HashSet<>(originIsr);
for (Integer id : assignments) {
if (aliveReplicas.contains(id)
&& isrSet.contains(id)
&& !shutdownTabletServers.contains(id)) {
Set<Integer> newAliveReplicas = new HashSet<>(aliveReplicas);
newAliveReplicas.removeAll(shutdownTabletServers);
List<Integer> newIsr =
originIsr.stream()
.filter(replica -> !shutdownTabletServers.contains(replica))
.collect(Collectors.toList());
return Optional.of(
new ElectionResult(
new ArrayList<>(newAliveReplicas),
leaderAndIsr.newLeaderAndIsr(id, newIsr)));
}
// Filter out available replicas: alive, in ISR, and not shutting down.
List<Integer> availableReplicas =
assignments.stream()
.filter(
replica ->
aliveReplicas.contains(replica)
&& isrSet.contains(replica)
&& !shutdownTabletServers.contains(replica))
.collect(Collectors.toList());

if (availableReplicas.isEmpty()) {
return Optional.empty();
}
return Optional.empty();

// For log table, simply use the first available replica as leader.
if (!isPrimaryKeyTable) {
List<Integer> newIsr =
originIsr.stream()
.filter(replica -> !shutdownTabletServers.contains(replica))
.collect(Collectors.toList());
Set<Integer> newAliveReplicas = new HashSet<>(aliveReplicas);
newAliveReplicas.removeAll(shutdownTabletServers);
return Optional.of(
new ElectionResult(
new ArrayList<>(newAliveReplicas),
leaderAndIsr.newLeaderAndIsr(
availableReplicas.get(0),
newIsr,
Collections.emptyList())));
}

// For PK table, elect leader and standby.
LeaderAndStandby leaderAndStandby =
electLeaderAndStandbyForPkTable(availableReplicas, leaderAndIsr);

Set<Integer> newAliveReplicas = new HashSet<>(aliveReplicas);
newAliveReplicas.removeAll(shutdownTabletServers);
List<Integer> newIsr =
originIsr.stream()
.filter(replica -> !shutdownTabletServers.contains(replica))
.collect(Collectors.toList());

return Optional.of(
new ElectionResult(
new ArrayList<>(newAliveReplicas),
leaderAndIsr.newLeaderAndIsr(
leaderAndStandby.leader,
newIsr,
leaderAndStandby.standbyReplicas)));
}
}

Expand All @@ -108,20 +145,109 @@ public ReassignmentLeaderElection(List<Integer> newReplicas) {
}

public Optional<ElectionResult> leaderElection(
List<Integer> liveReplicas, LeaderAndIsr leaderAndIsr) {
// currently, we always use the first replica in targetReplicas, which also in
List<Integer> liveReplicas, LeaderAndIsr leaderAndIsr, boolean isPrimaryKeyTable) {
// Currently, we always use the first replica in targetReplicas, which also in
// liveReplicas and isr as the leader replica. For bucket reassignment, the first
// replica is the target leader replica.
List<Integer> isr = leaderAndIsr.isr();
for (int assignment : newReplicas) {
if (liveReplicas.contains(assignment) && isr.contains(assignment)) {
return Optional.of(
new ElectionResult(
liveReplicas, leaderAndIsr.newLeaderAndIsr(assignment, isr)));
}
}
List<Integer> availableReplicas =
newReplicas.stream()
.filter(
replica ->
liveReplicas.contains(replica) && isr.contains(replica))
.collect(Collectors.toList());
return electLeader(availableReplicas, liveReplicas, leaderAndIsr, isPrimaryKeyTable);
}
}

// ------------------------------------------------------------------------
// Common election logic
// ------------------------------------------------------------------------

private static Optional<ElectionResult> electLeader(
List<Integer> availableReplicas,
List<Integer> aliveReplicas,
LeaderAndIsr leaderAndIsr,
boolean isPrimaryKeyTable) {
if (availableReplicas.isEmpty()) {
return Optional.empty();
}

List<Integer> isr = leaderAndIsr.isr();

// For log table, simply use the first available replica as leader.
if (!isPrimaryKeyTable) {
return Optional.of(
new ElectionResult(
aliveReplicas,
leaderAndIsr.newLeaderAndIsr(
availableReplicas.get(0), isr, Collections.emptyList())));
}

// For PK table, elect leader and standby.
LeaderAndStandby leaderAndStandby =
electLeaderAndStandbyForPkTable(availableReplicas, leaderAndIsr);
return Optional.of(
new ElectionResult(
aliveReplicas,
leaderAndIsr.newLeaderAndIsr(
leaderAndStandby.leader, isr, leaderAndStandby.standbyReplicas)));
}

/**
* Elect leader and standby for PK table.
*
* <p>Election strategy:
*
* <ul>
* <li>If current standby exists and is available, promote it to leader
* <li>Otherwise, use the first available replica as leader
* <li>Select new standby from remaining available replicas (if any)
* </ul>
*/
private static LeaderAndStandby electLeaderAndStandbyForPkTable(
List<Integer> availableReplicas, LeaderAndIsr leaderAndIsr) {
int currentStandby = getCurrentStandby(leaderAndIsr);
int newLeader;

if (currentStandby != -1 && availableReplicas.contains(currentStandby)) {
// Promote current standby to leader.
newLeader = currentStandby;
} else {
// Use first available replica as leader.
newLeader = availableReplicas.get(0);
}

// Find new standby from remaining replicas.
List<Integer> standbyReplicas = findNewStandby(availableReplicas, newLeader);
return new LeaderAndStandby(newLeader, standbyReplicas);
}

/** Get current standby replica ID, returns -1 if no standby exists. */
private static int getCurrentStandby(LeaderAndIsr leaderAndIsr) {
return leaderAndIsr.standbyReplicas().isEmpty()
? -1
: leaderAndIsr.standbyReplicas().get(0);
}

/** Find new standby from available replicas, excluding the leader. */
private static List<Integer> findNewStandby(
List<Integer> availableReplicas, int excludeLeader) {
return availableReplicas.stream()
.filter(replica -> replica != excludeLeader)
.findFirst()
.map(Collections::singletonList)
.orElse(Collections.emptyList());
}

/** Internal class to hold leader and standby election result. */
private static class LeaderAndStandby {
final int leader;
final List<Integer> standbyReplicas;

LeaderAndStandby(int leader, List<Integer> standbyReplicas) {
this.leader = leader;
this.standbyReplicas = standbyReplicas;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,8 @@ private Map<TableBucketReplica, LeaderAndIsr> doRemoveReplicaFromIsr(
.collect(Collectors.toList());
LeaderAndIsr adjustLeaderAndIsr =
newLeader == LeaderAndIsr.NO_LEADER
? leaderAndIsr.newLeaderAndIsr(newLeader, newIsr)
? leaderAndIsr.newLeaderAndIsr(
newLeader, newIsr, leaderAndIsr.standbyReplicas())
: leaderAndIsr.newLeaderAndIsr(newIsr);
adjustedLeaderAndIsr.put(tableBucketReplica, adjustLeaderAndIsr);
toUpdateLeaderAndIsrList.put(tableBucket, adjustLeaderAndIsr);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,12 @@ private Optional<ElectionResult> doInitElectionForBucket(
// servers as inSyncReplica set.
Optional<ElectionResult> resultOpt =
initReplicaLeaderElection(
assignedServers, liveServers, coordinatorContext.getCoordinatorEpoch());
assignedServers,
liveServers,
coordinatorContext.getCoordinatorEpoch(),
coordinatorContext
.getTableInfoById(tableBucket.getTableId())
.hasPrimaryKey());
if (!resultOpt.isPresent()) {
LOG.error(
"The leader election for table bucket {} is empty.",
Expand Down Expand Up @@ -613,10 +618,12 @@ private Optional<ElectionResult> electLeader(
}

Optional<ElectionResult> resultOpt = Optional.empty();
boolean isPkTable =
coordinatorContext.getTableInfoById(tableBucket.getTableId()).hasPrimaryKey();
if (electionStrategy instanceof DefaultLeaderElection) {
resultOpt =
((DefaultLeaderElection) electionStrategy)
.leaderElection(assignment, liveReplicas, leaderAndIsr);
.leaderElection(assignment, liveReplicas, leaderAndIsr, isPkTable);
} else if (electionStrategy instanceof ControlledShutdownLeaderElection) {
Set<Integer> shuttingDownTabletServers = coordinatorContext.shuttingDownTabletServers();
resultOpt =
Expand All @@ -625,11 +632,12 @@ private Optional<ElectionResult> electLeader(
assignment,
liveReplicas,
leaderAndIsr,
shuttingDownTabletServers);
shuttingDownTabletServers,
isPkTable);
} else if (electionStrategy instanceof ReassignmentLeaderElection) {
resultOpt =
((ReassignmentLeaderElection) electionStrategy)
.leaderElection(liveReplicas, leaderAndIsr);
.leaderElection(liveReplicas, leaderAndIsr, isPkTable);
}

if (!resultOpt.isPresent()) {
Expand Down Expand Up @@ -670,23 +678,40 @@ public LeaderAndIsr getLeaderAndIsr() {
* @param assignments the assignments
* @param aliveReplicas the alive replicas
* @param coordinatorEpoch the coordinator epoch
* @param isPrimaryKeyTable whether this table bucket is primary key table
* @return the election result
*/
@VisibleForTesting
public static Optional<ElectionResult> initReplicaLeaderElection(
List<Integer> assignments, List<Integer> aliveReplicas, int coordinatorEpoch) {
// currently, we always use the first replica in assignment, which also in aliveReplicas and
// isr as the leader replica.
for (int assignment : assignments) {
if (aliveReplicas.contains(assignment)) {
return Optional.of(
new ElectionResult(
aliveReplicas,
new LeaderAndIsr(
assignment, 0, aliveReplicas, coordinatorEpoch, 0)));
List<Integer> assignments,
List<Integer> aliveReplicas,
int coordinatorEpoch,
boolean isPrimaryKeyTable) {
// First we will filter out the assignment list to only contain the alive replicas.
List<Integer> availableReplicas =
assignments.stream().filter(aliveReplicas::contains).collect(Collectors.toList());

// If the assignment list is empty, we return empty.
if (availableReplicas.isEmpty()) {
return Optional.empty();
}

// Then we will use the first replica in assignment as the leader replica.
int leader = availableReplicas.get(0);

// If this table is primaryKey table, we will use the second replica in assignment as the
// standby if exists.
List<Integer> standbyReplicas = new ArrayList<>();
if (isPrimaryKeyTable) {
if (availableReplicas.size() > 1) {
standbyReplicas.add(availableReplicas.get(1));
}
}

return Optional.empty();
return Optional.of(
new ElectionResult(
aliveReplicas,
new LeaderAndIsr(
leader, 0, aliveReplicas, standbyReplicas, coordinatorEpoch, 0)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,12 @@ public int[] getIsrArray() {
public LeaderAndIsr getLeaderAndIsr() {
return leaderAndIsr;
}

public List<Integer> getStandbyReplicas() {
return leaderAndIsr.standbyReplicas();
}

public int[] getStandbyReplicasArray() {
return leaderAndIsr.standbyReplicas().stream().mapToInt(Integer::intValue).toArray();
}
}
Loading