Skip to content

Commit ae71779

Browse files
author
Taichi Yamakawa
committed
Merge remote-tracking branch 'origin/master' into add-entity-diagnostic-logs
2 parents 9e1d2a0 + 7629647 commit ae71779

File tree

10 files changed

+351
-116
lines changed

10 files changed

+351
-116
lines changed

CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1919
- Enhance leader's replication response handling [PR#160](https://github.com/lerna-stack/akka-entity-replication/pull/160)
2020
- Change event sourcing log level to debug
2121
[PR#163](https://github.com/lerna-stack/akka-entity-replication/pull/163)
22+
- ReplicationRegionRaftActorStarter uses its FQCN as its logger name
23+
[PR178](https://github.com/lerna-stack/akka-entity-replication/pull/178)
24+
- Add diagnostic info to logs of sending replication results
25+
[PR#179](https://github.com/lerna-stack/akka-entity-replication/pull/179)
2226

2327
### Fixed
2428
- RaftActor might delete committed entries
@@ -44,6 +48,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
4448
- Snapshot synchronization could remove committed log entries that not be included in snapshots
4549
[#167](https://github.com/lerna-stack/akka-entity-replication/issues/167)
4650
[#PR168](https://github.com/lerna-stack/akka-entity-replication/pull/168)
51+
- SnapshotStore doesn't reply with SnapshotNotFound sometimes
52+
[#182](https://github.com/lerna-stack/akka-entity-replication/issues/182),
53+
[#PR183](https://github.com/lerna-stack/akka-entity-replication/pull/183)
4754

4855
## [v2.1.0] - 2022-03-24
4956
[v2.1.0]: https://github.com/lerna-stack/akka-entity-replication/compare/v2.0.0...v2.1.0

docs/typed/implementation_guide.md

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,57 @@ Consistency is ensured when it processes operations that can effect outside the
258258
The entity will output results base on the consistent up-to-date state even if under the network partitioning.
259259
The commands will be fail on one side of the partitioned network to keep consistency.
260260

261+
### Detecting data inconsistencies by Entity Implementation
262+
263+
While akka-entity-replication 2.2.0 or above closes some data inconsistency issues,
264+
detecting such inconsistency issues by entity implementation is preferred.
265+
An entity can use the following techniques to detect data inconsistencies:
266+
267+
* To detect an event duplication and miss, use an event number. As the state of the entity, the entity holds the event
268+
number (called LastAppliedEventNumber) of the last event the entity applied itself. Furthermore, the entity puts the
269+
event number (specifically, LastAppliedEventNumber plus one) to an event. The event handler of the entity verifies
270+
that the event has the expected event number (specifically, the event number must be equal to LastAppliedEventNumber
271+
plus one). If this verification fails, either an event duplication or miss has happened.
272+
* To detect an event misdelivery, put the entity ID to an event. The event handler of the entity verifies that the event
273+
has the same entity ID as its own. If this verification fails, an event misdelivery has happened.
274+
275+
The following example illustrates how an entity detects data inconsistencies:
276+
277+
```scala
278+
import lerna.akka.entityreplication.typed._
279+
280+
object MyReplicatedEntity {
281+
final case class Command()
282+
final case class Event(entityId: String, eventNo: Long)
283+
final case class State(lastAppliedEventNo: Long)
284+
285+
def apply(entityContext: ReplicatedEntityContext[Command]): Behavior[Command] =
286+
ReplicatedEntityBehavior[Command, Event, State](
287+
entityContext,
288+
emptyState = State(lastAppliedEventNo = 0),
289+
commandHandler = (state, command) => {
290+
if (??? /* the command is not processed yet */) {
291+
// Replicate an event as below:
292+
// - To detect an event duplication and miss, put the event number (`state.lastAppliedEventNo + 1`) to the event.
293+
// - To detect an event misdelivery, put the entity ID (`entityContext.entityId`) to the event.
294+
Effect.replicate(Event(entityContext.entityId, state.lastAppliedEventNo + 1))
295+
} else {
296+
// Replicate nothing
297+
???
298+
}
299+
},
300+
eventHandler = (state, event) => {
301+
// To detect an event duplication and miss, verifies the event has the expected event number:
302+
require(event.eventNo == state.lastAppliedEventNo + 1)
303+
// To detect an event misdelivery, verifies the event has the expected entity ID:
304+
require(event.entityId == entityContext.entityId)
305+
// The next state must set the event number of the event to LastAppliedEventNo:
306+
State(event.eventNo)
307+
}
308+
)
309+
}
310+
```
311+
261312
### Passivation
262313

263314
You can stop entities that are not used to reduce memory consumption.

src/main/scala/lerna/akka/entityreplication/ReplicationRegionRaftActorStarter.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ private[entityreplication] object ReplicationRegionRaftActorStarter {
3535
): Behavior[Nothing] = {
3636
Behaviors
3737
.setup[Command] { context =>
38+
context.setLoggerName(this.getClass)
3839
val (disableIds, enableIds) = ids.partition(settings.disabledShards.contains)
3940
if (disableIds.nonEmpty) {
4041
context.log.info(

src/main/scala/lerna/akka/entityreplication/raft/RaftActor.scala

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -304,8 +304,15 @@ private[raft] class RaftActor(
304304
.handleCommittedLogEntriesAndClients { entries =>
305305
entries.foreach {
306306
case (logEntry, Some(client)) =>
307-
if (log.isDebugEnabled)
308-
log.debug("=== [Leader] committed {} and will notify it to {} ===", logEntry, client)
307+
if (log.isDebugEnabled) {
308+
log.debug(
309+
s"=== [${currentState}] Committed event" +
310+
s" (index=[${logEntry.index}], term=[${logEntry.term.term}]," +
311+
s" entityId=[${logEntry.event.entityId.map(_.raw)}]," +
312+
s" type=[${logEntry.event.event.getClass.getName}])" +
313+
s" and sending ReplicationSucceeded to [$client] ===",
314+
)
315+
}
309316
client.forward(ReplicationSucceeded(logEntry.event.event, logEntry.index, client.instanceId))
310317
case (logEntry, None) =>
311318
// 復旧中の commit or リーダー昇格時に未コミットのログがあった場合の commit
@@ -494,11 +501,27 @@ private[raft] class RaftActor(
494501
logEntry.event match {
495502
case EntityEvent(_, NoOp) => // NoOp は replicationActor には関係ないので転送しない
496503
case EntityEvent(Some(entityId), event) =>
497-
if (log.isDebugEnabled) log.debug("=== [{}] applying {} to ReplicationActor ===", currentState, event)
498-
replicationActor(entityId) ! Replica(logEntry)
504+
val targetReplicationActor = replicationActor(entityId)
505+
if (log.isDebugEnabled) {
506+
log.debug(
507+
s"=== [${currentState}] Sending Replica to apply event" +
508+
s" (index=[${logEntry.index}], term=[${logEntry.term.term}]," +
509+
s" entityId=[${entityId.raw}]," +
510+
s" eventType=[${event.getClass.getName}])" +
511+
s" to ReplicationActor [$targetReplicationActor] ===",
512+
)
513+
}
514+
targetReplicationActor ! Replica(logEntry)
499515
case EntityEvent(None, event) =>
500-
if (log.isWarningEnabled)
501-
log.warning("=== [{}] {} was not applied, because it is not assigned any entity ===", currentState, event)
516+
if (log.isWarningEnabled) {
517+
log.warning(
518+
"[{}] event [index=[{}], term=[{}], type={}] was not applied, because it is not assigned any entity",
519+
currentState,
520+
logEntry.index,
521+
logEntry.term.term,
522+
event.getClass.getName,
523+
)
524+
}
502525
}
503526

504527
def handleSnapshotTick(): Unit = {

src/main/scala/lerna/akka/entityreplication/raft/snapshot/SnapshotStore.scala

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,11 @@ private[entityreplication] object SnapshotStore {
1717
selfMemberIndex: MemberIndex,
1818
): Props =
1919
Props(new SnapshotStore(typeName, entityId, settings, selfMemberIndex))
20+
21+
/** Returns a persistence ID of SnapshotStore */
22+
def persistenceId(typeName: TypeName, entityId: NormalizedEntityId, selfMemberIndex: MemberIndex): String =
23+
ActorIds.persistenceId("SnapshotStore", typeName.underlying, entityId.underlying, selfMemberIndex.role)
24+
2025
}
2126

2227
private[entityreplication] class SnapshotStore(
@@ -29,7 +34,7 @@ private[entityreplication] class SnapshotStore(
2934
import SnapshotProtocol._
3035

3136
override def persistenceId: String =
32-
ActorIds.persistenceId("SnapshotStore", typeName.underlying, entityId.underlying, selfMemberIndex.role)
37+
SnapshotStore.persistenceId(typeName, entityId, selfMemberIndex)
3338

3439
override def journalPluginId: String = settings.journalPluginId
3540

@@ -91,8 +96,11 @@ private[entityreplication] class SnapshotStore(
9196
cmd.entityId,
9297
)
9398
case FetchSnapshot(_, replyTo) =>
94-
prevSnapshot.foreach { s =>
95-
replyTo ! SnapshotProtocol.SnapshotFound(s)
99+
prevSnapshot match {
100+
case Some(prevSnapshot) =>
101+
replyTo ! SnapshotProtocol.SnapshotFound(prevSnapshot)
102+
case None =>
103+
replyTo ! SnapshotProtocol.SnapshotNotFound(entityId)
96104
}
97105
}
98106
case _: persistence.SaveSnapshotSuccess =>

0 commit comments

Comments
 (0)