Skip to content

Commit cc3458b

Browse files
authored
Merge pull request #137 from lerna-stack/event-sourcing-progress-tacking
Tracks progress of event sourcing
2 parents 71c1cef + 9ee278b commit cc3458b

35 files changed

+2841
-315
lines changed

CHANGELOG.md

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,27 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1818
This feature is enabled only by using `typed.ClusterReplication`.
1919
It is highly recommended that you switch using the typed API since the classic API was deprecated.
2020

21+
- Raft actors track the progress of the event sourcing [#136](https://github.com/lerna-stack/akka-entity-replication/issues/136).
22+
23+
This feature ensures that
24+
- Event Sourcing won't halt even if the event-sourcing store is unavailable for a long period.
25+
After the event-sourcing store recovers, Event Sourcing will work again automatically.
26+
- Compaction won't delete committed events that are not persisted to the event-sourcing store yet.
27+
28+
It adds new following settings (for more details, please see `reference.conf`):
29+
- `lerna.akka.entityreplication.raft.eventsourced.committed-log-entries-check-interval`
30+
- `lerna.akka.entityreplication.raft.eventsourced.max-append-committed-entries-size`
31+
- `lerna.akka.entityreplication.raft.eventsourced.max-append-committed-entries-batch-size`
32+
33+
It deletes the following settings:
34+
- `lerna.akka.entityreplication.raft.eventsourced.commit-log-store.retry.attempts`
35+
- `lerna.akka.entityreplication.raft.eventsourced.commit-log-store.retry.delay`
36+
37+
It requires that
38+
`lerna.akka.entityreplication.raft.compaction.preserve-log-size` is less than
39+
`lerna.akka.entityreplication.raft.compaction.log-size-threshold`.
40+
41+
2142
### Changed
2243
- Bump up Akka version to 2.6.17 [PR#98](https://github.com/lerna-stack/akka-entity-replication/pull/98)
2344

docs/implementation_guide.md

Lines changed: 2 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -166,81 +166,8 @@ AtLeastOnceComplete.askTo(
166166

167167
### Configuration
168168

169-
On the command side, there are the following settings.
169+
On the command side, the related settings are defined at `lerna.akka.entityreplication`(except `lerna.akka.entityreplication.raft.eventsourced`) in [reference.conf](/src/main/resources/reference.conf).
170170

171-
```hocon
172-
lerna.akka.entityreplication {
173-
174-
// How long wait before giving up entity recovery.
175-
// Entity recovery requires a snapshot, and failure fetching it will cause this timeout.
176-
// If timed out, entity recovery will be retried.
177-
recovery-entity-timeout = 10s
178-
179-
raft {
180-
// The time it takes to start electing a new leader after the heartbeat is no longer received from the leader.
181-
election-timeout = 750 ms
182-
183-
// The interval between leaders sending heartbeats to their followers
184-
heartbeat-interval = 100 ms
185-
186-
// A role to identify the nodes to place replicas on
187-
// The number of roles is the number of replicas. It is recommended to set up at least three roles.
188-
multi-raft-roles = ["replica-group-1", "replica-group-2", "replica-group-3"]
189-
190-
// Maximum number of entries which AppendEntries contains.
191-
// The too large size will cause message serialization failure.
192-
max-append-entries-size = 16
193-
194-
// The maximum number of AppendEnteis that will be sent at once at every heartbeat-interval.
195-
max-append-entries-batch-size = 10
196-
197-
// log compaction settings
198-
compaction {
199-
200-
// Time interval to check the size of the log and check if a snapshotting is needed to be taken
201-
log-size-check-interval = 10s
202-
203-
// Threshold for saving snapshots and compaction of the log.
204-
// If this value is too large, your application will use a lot of memory and you may get an OutOfMemoryError.
205-
// If this value is too small, it compaction may occur frequently and overload the application and the data store.
206-
log-size-threshold = 50000
207-
208-
// Preserving log entries from log reduction to avoid log replication failure.
209-
// If more number of logs than this value cannot be synchronized, the raft member will be unavailable.
210-
// It is recommended to set this value even less than log-size-threshold. Otherwise compaction will be run at every log-size-check-interval.
211-
preserve-log-size = 10000
212-
213-
// Time to keep a cache of snapshots in memory
214-
snapshot-cache-time-to-live = 10s
215-
}
216-
217-
// snapshot synchronization settings
218-
snapshot-sync {
219-
220-
// Number of snapshots of entities that are copied in parallel
221-
snapshot-copying-parallelism = 10
222-
223-
// Time to abort operations related to persistence
224-
persistence-operation-timeout = 10s
225-
}
226-
227-
// data persistent settings
228-
persistence {
229-
// Absolute path to the journal plugin configuration entry.
230-
// The journal will be stored events which related to Raft.
231-
journal.plugin = ""
232-
233-
// Absolute path to the snapshot store plugin configuration entry.
234-
// The snapshot store will be stored state which related to Raft.
235-
snapshot-store.plugin = ""
236-
237-
// Absolute path to the query plugin configuration entry.
238-
// Snapshot synchronization reads events that related to Raft.
239-
query.plugin = ""
240-
}
241-
}
242-
}
243-
```
244171

245172
## Read Side
246173

@@ -358,33 +285,8 @@ object EventHandler {
358285

359286
### Configuration
360287

361-
On the read side, there are the following settings.
362-
363-
```hocon
364-
lerna.akka.entityreplication.raft.eventsourced {
365-
// Settings for saving committed events from each RaftActor
366-
commit-log-store {
367-
// Retry setting to prevent events from being lost if commit-log-store(sharding) stops temporarily
368-
retry {
369-
attempts = 15
370-
delay = 3 seconds
371-
}
372-
}
373-
374-
persistence {
375-
// Absolute path to the journal plugin configuration entry.
376-
// The journal stores Raft-committed events.
377-
journal.plugin = ""
378-
379-
// Absolute path to the snapshot-store plugin configuration entry.
380-
// The snapshot-store stores a state (snapshot) built from Raft-committed events.
381-
snapshot-store.plugin = ""
288+
On the read side, the related settings are defined at `lerna.akka.entityreplication.raft.eventsourced` in [reference.conf](/src/main/resources/reference.conf).
382289

383-
// Snapshot after this number of events.
384-
snapshot-every = 1000
385-
}
386-
}
387-
```
388290

389291
## Persistence plugin configuration
390292

docs/typed/implementation_guide.md

Lines changed: 2 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -360,87 +360,8 @@ This is useful when you would like to change the datastore that persists events
360360

361361
### Configuration
362362

363-
On the command side, there are the following settings.
363+
On the command side, the related settings are defined at `lerna.akka.entityreplication`(except `lerna.akka.entityreplication.raft.eventsourced`) in [reference.conf](/src/main/resources/reference.conf).
364364

365-
```hocon
366-
lerna.akka.entityreplication {
367-
368-
// How long wait before giving up entity recovery.
369-
// Entity recovery requires a snapshot, and failure fetching it will cause this timeout.
370-
// If timed out, entity recovery will be retried.
371-
recovery-entity-timeout = 10s
372-
373-
raft {
374-
// The time it takes to start electing a new leader after the heartbeat is no longer received from the leader.
375-
election-timeout = 750 ms
376-
377-
// The interval between leaders sending heartbeats to their followers
378-
heartbeat-interval = 100 ms
379-
380-
// A role to identify the nodes to place replicas on
381-
// The number of roles is the number of replicas. It is recommended to set up at least three roles.
382-
multi-raft-roles = ["replica-group-1", "replica-group-2", "replica-group-3"]
383-
384-
// Number of shards per single multi-raft-role used by only typed APIs.
385-
// This value must be the same for all nodes in the cluster
386-
// and must not be changed after starting to use.
387-
// Changing this value will cause data inconsistency.
388-
number-of-shards = 100
389-
390-
// Maximum number of entries which AppendEntries contains.
391-
// The too large size will cause message serialization failure.
392-
max-append-entries-size = 16
393-
394-
// The maximum number of AppendEnteis that will be sent at once at every heartbeat-interval.
395-
max-append-entries-batch-size = 10
396-
397-
// log compaction settings
398-
compaction {
399-
400-
// Time interval to check the size of the log and check if a snapshotting is needed to be taken
401-
log-size-check-interval = 10s
402-
403-
// Threshold for saving snapshots and compaction of the log.
404-
// If this value is too large, your application will use a lot of memory and you may get an OutOfMemoryError.
405-
// If this value is too small, it compaction may occur frequently and overload the application and the data store.
406-
log-size-threshold = 50000
407-
408-
// Preserving log entries from log reduction to avoid log replication failure.
409-
// If more number of logs than this value cannot be synchronized, the raft member will be unavailable.
410-
// It is recommended to set this value even less than log-size-threshold. Otherwise compaction will be run at every log-size-check-interval.
411-
preserve-log-size = 10000
412-
413-
// Time to keep a cache of snapshots in memory
414-
snapshot-cache-time-to-live = 10s
415-
}
416-
417-
// snapshot synchronization settings
418-
snapshot-sync {
419-
420-
// Number of snapshots of entities that are copied in parallel
421-
snapshot-copying-parallelism = 10
422-
423-
// Time to abort operations related to persistence
424-
persistence-operation-timeout = 10s
425-
}
426-
427-
// data persistent settings
428-
persistence {
429-
// Absolute path to the journal plugin configuration entry.
430-
// The journal will be stored events which related to Raft.
431-
journal.plugin = ""
432-
433-
// Absolute path to the snapshot store plugin configuration entry.
434-
// The snapshot store will be stored state which related to Raft.
435-
snapshot-store.plugin = ""
436-
437-
// Absolute path to the query plugin configuration entry.
438-
// Snapshot synchronization reads events that related to Raft.
439-
query.plugin = ""
440-
}
441-
}
442-
}
443-
```
444365

445366
## Read Side
446367

@@ -563,26 +484,8 @@ You can set an arbitrary value however you cannot change the value easily after
563484

564485
### Configuration
565486

566-
On the read side, there are the following settings.
487+
On the read side, the related settings are defined at `lerna.akka.entityreplication.raft.eventsourced` in [reference.conf](/src/main/resources/reference.conf).
567488

568-
```hocon
569-
lerna.akka.entityreplication.raft.eventsourced {
570-
// Settings for saving committed events from each RaftActor
571-
commit-log-store {
572-
// Retry setting to prevent events from being lost if commit-log-store(sharding) stops temporarily
573-
retry {
574-
attempts = 15
575-
delay = 3 seconds
576-
}
577-
}
578-
579-
persistence {
580-
// Absolute path to the journal plugin configuration entry.
581-
// The journal stores Raft-committed events.
582-
journal.plugin = ""
583-
}
584-
}
585-
```
586489

587490
## Persistence plugin configuration
588491

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# It is safe to exclude the following since these classes are package private.
2+
ProblemFilters.exclude[MissingClassProblem]("lerna.akka.entityreplication.raft.eventsourced.CommitLogStore")
3+
ProblemFilters.exclude[MissingClassProblem]("lerna.akka.entityreplication.raft.eventsourced.ShardedCommitLogStore")

src/main/protobuf/cluster_replication.proto

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,16 @@ message CommitLogStoreActorState {
8181
required LogEntryIndex current_index = 1;
8282
}
8383

84+
message CommitLogStoreAppendCommittedEntries {
85+
required NormalizedShardId shard_id = 1;
86+
repeated LogEntry entries = 2;
87+
}
88+
89+
message CommitLogStoreAppendCommittedEntriesResponse {
90+
required LogEntryIndex current_index = 1;
91+
}
92+
93+
8494
// ===
8595
// raft.protocol
8696
// ===

src/main/resources/reference.conf

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ lerna.akka.entityreplication {
4242

4343
// Preserving log entries from log reduction to avoid log replication failure.
4444
// If more number of logs than this value cannot be synchronized, the raft member will be unavailable.
45-
// It is recommended to set this value even less than log-size-threshold. Otherwise compaction will be run at every log-size-check-interval.
45+
// This value should be less than `log-size-threshold` and greater than 0. Otherwise, instantiating RaftSettings will fail.
4646
preserve-log-size = 10000
4747

4848
// Time to keep a cache of snapshots in memory
@@ -114,14 +114,25 @@ lerna.akka.entityreplication {
114114
}
115115

116116
raft.eventsourced {
117-
// Settings for saving committed events from each RaftActor
118-
commit-log-store {
119-
// Retry setting to prevent events from being lost if commit-log-store(sharding) stops temporarily
120-
retry {
121-
attempts = 15
122-
delay = 3 seconds
123-
}
124-
}
117+
118+
// Interval in which Raft Leader checks its committed log entries
119+
//
120+
// When new committed log entries are available, the leader sends these new entries to event-sourcing store(a.k.a. CommitLogStore).
121+
// This interval should be larger enough than network latencies since CommitLogStore might run on another node not running the leader.
122+
// If this interval is smaller than such latencies, the leader sends the same entry multiple times, which causes network resource inefficiency.
123+
committed-log-entries-check-interval = 100ms
124+
125+
// Maximum number of entries AppendCommittedEntries contains.
126+
// The default value is the same as `raft.max-append-entries-size`.
127+
// A too-large value might cause message serialization failure.
128+
max-append-committed-entries-size = ${lerna.akka.entityreplication.raft.max-append-entries-size}
129+
130+
// Maximum number of AppendCommittedEntries to send at once at every `committed-log-entries-check-interval`.
131+
// The default value is the same as `raft.max-append-entries-batch-size`.
132+
// If there are many not-persisted committed entries,
133+
// * A too-large value might cause temporary network overload
134+
// * A too-small value might cause event-sourcing to take more time to catch up on the latest.
135+
max-append-committed-entries-batch-size = ${lerna.akka.entityreplication.raft.max-append-entries-batch-size}
125136

126137
persistence {
127138
// Absolute path to the journal plugin configuration entry.

src/main/scala/lerna/akka/entityreplication/ClusterReplication.scala

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package lerna.akka.entityreplication
33
import akka.actor.{ Actor, ActorRef, ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider, Props, Status }
44
import lerna.akka.entityreplication.ClusterReplication.EntityPropsProvider
55
import lerna.akka.entityreplication.model.TypeName
6-
import lerna.akka.entityreplication.raft.eventsourced.{ CommitLogStore, ShardedCommitLogStore }
6+
import lerna.akka.entityreplication.raft.eventsourced.CommitLogStoreActor
77
import lerna.akka.entityreplication.util.ActorIds
88
import akka.util.Timeout
99
import akka.pattern.ask
@@ -92,12 +92,8 @@ private[entityreplication] class ClusterReplicationGuardian extends Actor {
9292
val _typeName = TypeName.from(typeName)
9393
val regionName = ActorIds.actorName(_typeName.underlying)
9494

95-
val maybeCommitLogStore: Option[CommitLogStore] = {
96-
// TODO: RMUの有効無効をconfigから指定
97-
val enabled = true // FIXME: settings から取得する (typeName ごとに切り替えられる必要あり)
98-
// TODO: テストのために差し替え出来るようにする
99-
Option.when(enabled)(new ShardedCommitLogStore(_typeName, context.system, settings))
100-
}
95+
val commitLogStore: ActorRef =
96+
CommitLogStoreActor.startClusterSharding(_typeName, context.system, settings)
10197

10298
val regionRef: ActorRef =
10399
context.child(regionName) match {
@@ -111,7 +107,7 @@ private[entityreplication] class ClusterReplicationGuardian extends Actor {
111107
extractEntityId,
112108
extractShardId,
113109
possibleShardIds,
114-
maybeCommitLogStore,
110+
commitLogStore,
115111
),
116112
regionName,
117113
)

0 commit comments

Comments
 (0)