Skip to content

Commit 4627246

Browse files
authored
Merge branch 'master' into refactor-snapshot-store-failure-spec
2 parents 5160e35 + c78b5c7 commit 4627246

File tree

13 files changed

+169
-9
lines changed

13 files changed

+169
-9
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1515
[PR#164](https://github.com/lerna-stack/akka-entity-replication/pull/164)
1616
- Add function extracting shard id from entity id to lerna.akka.entityreplication.typed.ClusterReplication
1717
[PR#172](https://github.com/lerna-stack/akka-entity-replication/pull/172)
18+
- Add function of Disabling raft actor [PR#173](https://github.com/lerna-stack/akka-entity-replication/pull/173)
1819

1920
### Fixed
2021
- RaftActor might delete committed entries

src/main/resources/reference.conf

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@ lerna.akka.entityreplication {
2222
// Changing this value will cause data inconsistency.
2323
number-of-shards = 100
2424

25+
// Shard Ids of raft actors to disable.
26+
// e.g. ["2", "5"]
27+
disabled-shards = []
28+
2529
// Maximum number of entries which AppendEntries contains.
2630
// The too large size will cause message serialization failure.
2731
max-append-entries-size = 16
@@ -111,6 +115,7 @@ lerna.akka.entityreplication {
111115
// Snapshot synchronization reads events that related to Raft.
112116
query.plugin = ""
113117
}
118+
114119
}
115120

116121
raft.eventsourced {

src/main/scala/lerna/akka/entityreplication/ReplicationRegion.scala

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,8 @@ private[entityreplication] class ReplicationRegion(
137137
private[this] val regions: Map[MemberIndex, mutable.Set[Member]] =
138138
allMemberIndexes.map(i => i -> mutable.Set.empty[Member]).toMap
139139

140+
private val disabledShards: Set[ShardId] = settings.raftSettings.disabledShards
141+
140142
// TODO 変数名を実態にあったものに変更
141143
private[this] val shardingRouters: Map[MemberIndex, ActorRef] = allMemberIndexes.map { memberIndex =>
142144
def clusterReplicationShardId(message: Any): String = extractNormalizedShardIdInternal(message).raw
@@ -255,11 +257,19 @@ private[entityreplication] class ReplicationRegion(
255257
def deliverMessage(message: Any): Unit = {
256258
if (extractShardId.isDefinedAt(message)) {
257259
val shardId = extractShardId(message)
258-
shardingRouters.values.foreach(
259-
// Don't forward StartEntity to prevent leaking StartEntityAck
260-
_.tell(ShardRegion.StartEntity(shardId), context.system.deadLetters),
261-
)
262-
handleRoutingCommand(DeliverSomewhere(Command(message)))
260+
if (!disabledShards.contains(shardId)) {
261+
shardingRouters.values.foreach(
262+
// Don't forward StartEntity to prevent leaking StartEntityAck
263+
_.tell(ShardRegion.StartEntity(shardId), context.system.deadLetters),
264+
)
265+
handleRoutingCommand(DeliverSomewhere(Command(message)))
266+
} else if (log.isWarningEnabled) {
267+
log.warning(
268+
s"Following command had sent to disabled shards was dropped: {}(shardId={})",
269+
message.getClass.getName,
270+
shardId,
271+
)
272+
}
263273
} else {
264274
if (log.isWarningEnabled)
265275
log.warning("The message [{}] was dropped because its shard ID could not be extracted", message)

src/main/scala/lerna/akka/entityreplication/ReplicationRegionRaftActorStarter.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,12 @@ private[entityreplication] object ReplicationRegionRaftActorStarter {
3535
): Behavior[Nothing] = {
3636
Behaviors
3737
.setup[Command] { context =>
38+
val (disableIds, enableIds) = ids.partition(settings.disabledShards.contains)
39+
if (disableIds.nonEmpty) {
40+
context.log.info(
41+
s"Following disabled shard ids are excluded from ids which is scheduled launching: ${disableIds}",
42+
)
43+
}
3844
val startEntityAckAdapter =
3945
context.messageAdapter[ShardRegion.StartEntityAck](ack => ClassicStartEntityAck(ack.entityId))
4046
Behaviors.withTimers { timers =>
@@ -46,7 +52,7 @@ private[entityreplication] object ReplicationRegionRaftActorStarter {
4652
startEntityAckAdapter,
4753
settings.raftActorAutoStartNumberOfActors,
4854
)
49-
starter.behavior(ids, Set.empty)
55+
starter.behavior(enableIds, Set.empty)
5056
}
5157
}.narrow[Nothing]
5258
}

src/main/scala/lerna/akka/entityreplication/raft/RaftActor.scala

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package lerna.akka.entityreplication.raft
22

33
import akka.actor.{ ActorRef, Cancellable, Props, Stash }
4-
import akka.persistence.RuntimePluginConfig
4+
import akka.persistence.{ Recovery, RuntimePluginConfig }
55
import com.typesafe.config.{ Config, ConfigFactory }
66
import lerna.akka.entityreplication.ClusterReplication.EntityPropsProvider
77
import lerna.akka.entityreplication.ReplicationRegion.Msg
@@ -214,6 +214,22 @@ private[raft] class RaftActor(
214214

215215
override def snapshotPluginConfig: Config = ConfigFactory.empty()
216216

217+
private def isDisabled: Boolean = settings.disabledShards.contains(shardId.raw)
218+
219+
override def recovery: Recovery = {
220+
if (isDisabled) {
221+
Recovery.none
222+
} else {
223+
Recovery()
224+
}
225+
}
226+
227+
override def preStart(): Unit = {
228+
if (isDisabled) {
229+
context.stop(self)
230+
}
231+
}
232+
217233
val numberOfMembers: Int = settings.replicationFactor
218234

219235
@nowarn("msg=Use RaftMemberData.truncateAndAppendEntries instead.")

src/main/scala/lerna/akka/entityreplication/raft/RaftSettings.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ trait RaftSettings {
2828

2929
def numberOfShards: Int
3030

31+
def disabledShards: Set[String]
32+
3133
def maxAppendEntriesSize: Int
3234

3335
def maxAppendEntriesBatchSize: Int

src/main/scala/lerna/akka/entityreplication/raft/RaftSettingsImpl.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ private[entityreplication] final case class RaftSettingsImpl(
1717
replicationFactor: Int,
1818
quorumSize: Int,
1919
numberOfShards: Int,
20+
disabledShards: Set[String],
2021
maxAppendEntriesSize: Int,
2122
maxAppendEntriesBatchSize: Int,
2223
compactionSnapshotCacheTimeToLive: FiniteDuration,
@@ -215,6 +216,8 @@ private[entityreplication] object RaftSettingsImpl {
215216
s"snapshot-every ($eventSourcedSnapshotEvery) should be greater than 0.",
216217
)
217218

219+
val disabledShards: Set[String] = config.getStringList("disabled-shards").asScala.toSet
220+
218221
RaftSettingsImpl(
219222
config = config,
220223
electionTimeout = electionTimeout,
@@ -224,6 +227,7 @@ private[entityreplication] object RaftSettingsImpl {
224227
replicationFactor = replicationFactor,
225228
quorumSize = quorumSize,
226229
numberOfShards = numberOfShards,
230+
disabledShards = disabledShards,
227231
maxAppendEntriesSize = maxAppendEntriesSize,
228232
maxAppendEntriesBatchSize = maxAppendEntriesBatchSize,
229233
compactionSnapshotCacheTimeToLive = compactionSnapshotCacheTimeToLive,

src/main/scala/lerna/akka/entityreplication/raft/eventsourced/CommitLogStoreActor.scala

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,14 @@ import akka.Done
44
import akka.actor.{ ActorLogging, ActorRef, ActorSystem, Props }
55
import akka.cluster.sharding.ShardRegion.HashCodeMessageExtractor
66
import akka.cluster.sharding.{ ClusterSharding, ClusterShardingSettings }
7-
import akka.persistence.{ PersistentActor, RecoveryCompleted, SaveSnapshotFailure, SaveSnapshotSuccess, SnapshotOffer }
7+
import akka.persistence.{
8+
PersistentActor,
9+
Recovery,
10+
RecoveryCompleted,
11+
SaveSnapshotFailure,
12+
SaveSnapshotSuccess,
13+
SnapshotOffer,
14+
}
815
import akka.util.ByteString
916
import lerna.akka.entityreplication.{ ClusterReplicationSerializable, ClusterReplicationSettings }
1017
import lerna.akka.entityreplication.model.{ NormalizedShardId, TypeName }
@@ -307,4 +314,19 @@ private[entityreplication] class CommitLogStoreActor(typeName: TypeName, setting
307314
}
308315
}
309316

317+
private def isDisabled: Boolean = settings.raftSettings.disabledShards.contains(shardId)
318+
319+
override def recovery: Recovery = {
320+
if (isDisabled) {
321+
Recovery.none
322+
} else {
323+
Recovery()
324+
}
325+
}
326+
327+
override def preStart(): Unit = {
328+
if (isDisabled) {
329+
context.stop(self)
330+
}
331+
}
310332
}

src/multi-jvm/scala/lerna/akka/entityreplication/ReplicationRegionSpec.scala

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package lerna.akka.entityreplication
22

33
import akka.Done
4+
import akka.actor.testkit.typed.scaladsl.LoggingTestKit
5+
import akka.actor.typed.scaladsl.adapter.ClassicActorSystemOps
46

57
import java.util.concurrent.atomic.AtomicInteger
68
import akka.actor.{ Actor, ActorRef, ActorSelection, DiagnosticActorLogging, Props, RootActorPath, Terminated }
@@ -120,6 +122,7 @@ object ReplicationRegionSpecConfig extends MultiNodeConfig {
120122
lerna.akka.entityreplication.raft.compaction.log-size-threshold = 2
121123
lerna.akka.entityreplication.raft.compaction.preserve-log-size = 1
122124
lerna.akka.entityreplication.raft.compaction.log-size-check-interval = 0.1s
125+
lerna.akka.entityreplication.raft.disabled-shards = ["12"]
123126
"""))
124127
.withValue(
125128
"lerna.akka.entityreplication.raft.multi-raft-roles",
@@ -334,6 +337,27 @@ class ReplicationRegionSpec extends MultiNodeSpec(ReplicationRegionSpecConfig) w
334337
}
335338
}
336339

340+
"drop all messages sent to disabled shards" in {
341+
val typeName = createSeqTypeName()
342+
343+
runOn(node4, node5, node6) {
344+
clusterReplication = createReplication(typeName)
345+
}
346+
enterBarrier("ReplicationRegion created")
347+
348+
// "disable-id"'s shard id is "12" and the shard id have been defined as disabled id.
349+
val entityId = "disabled-id"
350+
runOn(node4) {
351+
LoggingTestKit
352+
.warn(
353+
"Following command had sent to disabled shards was dropped: lerna.akka.entityreplication.ReplicationRegionSpec$DummyReplicationActor$Cmd(shardId=12)",
354+
).expect {
355+
clusterReplication ! Cmd(entityId)
356+
}(system.toTyped)
357+
}
358+
enterBarrier("Some command sent")
359+
}
360+
337361
"Broadcast" when {
338362

339363
"全 MemberIndex に配信される" in {

src/test/scala/lerna/akka/entityreplication/ReplicationRegionRaftActorStarterSpec.scala

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,33 @@ final class ReplicationRegionRaftActorStarterSpec
7474

7575
}
7676

77+
"trigger all actor starts without disabled actor" in {
78+
val shardRegionProbe = TestProbe()
79+
80+
val customSettings = RaftSettings(
81+
ConfigFactory
82+
.parseString("""
83+
|lerna.akka.entityreplication.raft.disabled-shards = ["2"]
84+
|""".stripMargin).withFallback(system.settings.config),
85+
)
86+
val raftActorStarter =
87+
spawnRaftActorStarter(shardRegionProbe.ref, Set("1", "2", "3"), customSettings)
88+
89+
assume(customSettings.raftActorAutoStartNumberOfActors >= 3)
90+
91+
// The starter will stop at the end of this test.
92+
watch(raftActorStarter)
93+
94+
// The starter should trigger all actor starts without disabled shards.
95+
val startedRaftActorIds = (1 to 2).map { _ =>
96+
expectStartEntityAndThenAck(shardRegionProbe)
97+
}.toSet
98+
startedRaftActorIds shouldBe Set("1", "3")
99+
100+
// The starter should stop itself.
101+
expectTerminated(raftActorStarter)
102+
}
103+
77104
"retry all actor starts with no ACK" in {
78105

79106
val shardRegionProbe = TestProbe()

0 commit comments

Comments
 (0)