Skip to content

Commit 826f7be

Browse files
committed
finalize CQRS architecture
1 parent 5cc8814 commit 826f7be

File tree

123 files changed

+4655
-1782
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

123 files changed

+4655
-1782
lines changed

README.md

Lines changed: 61 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
# generic-persistence-api
2-
a CQRS/ES framework in scala using akka persistence
32

4-
![](docs/diagrams/src/CQRS.svg)
3+
a **CQRS/ES** framework written in scala using akka persistence
4+
5+
![CQRS](docs/diagrams/src/CQRS.svg)
56

67
"CQRS is simply the creation of two objects where there was previously only one. The separation occurs based upon whether the methods are a command or a query (the same definition that is used by Meyer in Command and Query Separation: a command is any method that mutates state and a query is any method that returns a value)."
78
—Greg Young, CQRS, Task Based UIs, Event Sourcing agh!
89

9-
## Write Side
10+
## CQRS Write Side
1011

1112
**generic-persistence-api** relies on [Akka Persistence](https://doc.akka.io/docs/akka/current/typed/persistence.html) to provide a **scalable** and **resilient** way to implement the write-side of CQRS using commands and events that simplify the implementation of event-sourced systems.
1213

@@ -42,7 +43,7 @@ The write-side using Akka **EventSourcedBehavior** typically involves the follow
4243
3. **Event replay**: When the actor is created or restarted, Akka Persistence automatically replays all the events from the event log, allowing the actor to rebuild its current state based on the events.
4344
4. **Snapshotting**: To avoid replaying all events every time the actor is created or restarted, Akka EventSourcedBehavior supports snapshotting. Every N events or when a given predicate of the state is fulfilled, the actor can save a **snapshot** of its current state, including any accumulated events and their tags after the previous snapshot. This snapshot is persisted alongside the event log and can be used to restore the state of the actor at a later point in time, rather than replaying all events from the beginning.
4445

45-
"Entity Behavior" is a **specialized factory** that relies on Cluster Sharding to create instances of "EventSourceBehavior" actors, each of which corresponds to a single actor entity managed by a sharding region.
46+
"Entity Behavior" is a **specialized factory** that relies on Cluster Sharding to create instances of "EventSourceBehavior" actors, each of which corresponds to a single actor entity managed by a sharding region.
4647

4748
It must define the type of command and event classes that the behavior will handle along with the state class that will handle its in-memory state.
4849

@@ -172,7 +173,7 @@ The resulting system is highly efficient, with the ability to quickly rebuild it
172173

173174
Moreover, providing the ability to **tag events** enables **read-side projections** to be easily implemented and maintained, improving the overall performance and scalability of the system.
174175

175-
![](docs/diagrams/out/EntityBehavior.svg)
176+
![EntityBehavior](docs/diagrams/out/EntityBehavior.svg)
176177

177178
### Entity Pattern
178179

@@ -219,8 +220,7 @@ It exposes several methods to allow a sender to communicate with actor entities.
219220

220221
````
221222

222-
![](docs/diagrams/out/Patterns.svg)
223-
223+
![Patterns](docs/diagrams/out/Patterns.svg)
224224

225225
### Commands
226226

@@ -230,7 +230,7 @@ Command responses play also an important role in providing feedback to the user
230230

231231
The framework defines the trait **_Command_** as the root interface of any command sends to a recipient in the system, and the trait **_CommandResult_** as the root interface of any command response.
232232

233-
![](docs/diagrams/out/Command.svg)
233+
![Command](docs/diagrams/out/Command.svg)
234234

235235
### Events
236236

@@ -240,7 +240,7 @@ If the write-side raises an event whenever the state of the application changes,
240240

241241
The framework defines the trait **_Event_** as the root interface of any event in the system.
242242

243-
![](docs/diagrams/out/Event.svg)
243+
![Event](docs/diagrams/out/Event.svg)
244244

245245
### State
246246

@@ -254,7 +254,7 @@ When an entity actor is created, it starts with an empty in-memory state and per
254254

255255
The framework defines the trait **_State_** as the root interface of the in-memory state of any entity actor.
256256

257-
![](docs/diagrams/out/State.svg)
257+
![State](docs/diagrams/out/State.svg)
258258

259259
### Serialization
260260

@@ -265,6 +265,7 @@ When an Akka actor receives a command, it creates one or more domain events and
265265
Finally, before persisting a snapshot, the current state of the actor (its in-memory state) is serialized using the configured serialization mechanism.
266266

267267
The framework supports natively three types of **Serializers**:
268+
268269
+ **proto**
269270
+ **jackson-cbor**
270271
+ **chill**
@@ -311,7 +312,7 @@ akka {
311312
}
312313
```
313314

314-
![](docs/diagrams/out/Serialization.svg)
315+
![Serialization](docs/diagrams/out/Serialization.svg)
315316

316317
#### Versioning
317318

@@ -320,5 +321,53 @@ You must consider how your system will be able to handle multiple versions of an
320321

321322
### Event Sourcing
322323

323-
![](docs/diagrams/out/EventSourcing.svg)
324+
![EventSourcing](docs/diagrams/out/EventSourcing.svg)
325+
326+
## CQRS Read side
327+
328+
Although it is easy to load the current state of an object by replaying its event stream (or its state at some point in the past), it is difficult or expensive to run a query such as, "find all my orders where the total value is greater than $250."
329+
By implementing the CQRS pattern, such queries will typically be executed on the read side where you can ensure that you can build data projections that are specifically designed to answer such questions.
330+
331+
The read side of CQRS is responsible for providing fast and efficient read access to the data that has been written by the write side.
332+
333+
It can be implemented in various ways, but a common pattern is to use event sourcing. In this approach, the read side subscribes to the event stream published by the write side and processes the events to build and update the read models.
334+
335+
This approach is implemented in the framework through the "event Processor Stream"
336+
337+
### Event Processor Stream
338+
339+
The "Event Processor Stream" implementation of the read side of CQRS subscribes to specific events based on their tags, which are published by the write side through event sourcing. By subscribing to these events, the read side can receive and process only the events that are relevant to its domain.
340+
341+
To ensure that each event is read only once for each stream processor, the read side maintains an offset for each stream processor, which is a pointer to the last event that was processed by that processor. By storing this offset for each stream processor, the read side can resume processing events from the point where each stream processor left off, in case of failure or restart.
342+
343+
The read side uses the event tags to filter the event stream, so that only relevant events are processed. This allows the read side to update its models in real-time based on the events published by the write side, while avoiding duplication or missing events.
344+
345+
The offset handling for each stream processor is a critical component of the "Event Processor Stream" implementation, as it ensures that each event is processed only once for each stream processor. The read side updates the offset for each stream processor each time an event is processed, so that it can resume processing from the point where each stream processor left off in case of failure or restart.
346+
347+
Overall, the "Event Processor Stream" implementation of the read side of CQRS provides an efficient and scalable way to handle a large volume of events, while ensuring that each event is processed only once for each stream processor and that the read models remain up-to-date and consistent with the write side.
348+
349+
### Offset Provider
350+
351+
When reading events from a stream, it is important to keep track of the position in the stream where the last event was read. This position is typically referred to as the "offset". The offset can be used to ensure that each event in the stream is read only once, even if the stream is read multiple times.
352+
353+
Here's an example of how to use an offset to read an event just once from a specific event stream:
354+
355+
1. Initialize offset: The first time the stream is read, the offset is set to the beginning of the stream.
356+
2. Read events: The application reads events from the event stream, starting from the current offset.
357+
3. Process event: The application processes each event it read.
358+
4. Update offset: Once the application has processed an event it read, it updates the offset to the position of the last event it read.
359+
5. Repeat: The application repeats the process of reading events, starting from the updated offset.
360+
361+
By keeping track of the offset, the application can ensure that each event in the stream is read only once. If the application crashes or is restarted, it can resume reading from the last known offset, ensuring that no events are missed or duplicated.
362+
363+
In some event sourcing systems, the offset is stored along with the event data, making it easy to resume reading from the last known offset. In other systems, the offset is stored separately, for example in a database or distributed cache. It is this second approach that was chosen and implemented within the framework.
364+
365+
Overall, the use of an offset is a key technique in event sourcing systems, as it allows events to be read just once from a specific event stream, ensuring data consistency and avoiding duplicate processing.
366+
367+
### Journal Provider
368+
369+
## Integration with other subsystems
370+
371+
Events provide a useful way of communicating with other subsystems. Your event store can publish events to notify other interested subsystems of changes to the application's state. Again, the event store provides a complete record of all the events that it published to other systems.
324372

373+
## Quick Start - Person Sample

build.sbt

Lines changed: 11 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ ThisBuild / organization := "app.softnetwork"
3030

3131
name := "generic-persistence-api"
3232

33-
ThisBuild / version := "0.2.6.2"
33+
ThisBuild / version := "0.3.0"
3434

3535
ThisBuild / scalaVersion := "2.12.11"
3636

@@ -81,13 +81,6 @@ lazy val coreTestkit = project.in(file("core/testkit"))
8181
commonTestkit % "compile->compile;test->test;it->it"
8282
)
8383

84-
lazy val schema = project.in(file("jdbc/schema"))
85-
.configs(IntegrationTest)
86-
.settings(Defaults.itSettings)
87-
.dependsOn(
88-
jdbc % "compile->compile;test->test;it->it"
89-
)
90-
9184
lazy val jdbc = project.in(file("jdbc"))
9285
.configs(IntegrationTest)
9386
.settings(Defaults.itSettings)
@@ -116,29 +109,29 @@ lazy val counter = project.in(file("counter"))
116109
coreTestkit % "test->test;it->it"
117110
)
118111

119-
lazy val elasticTestkit = project.in(file("elastic/testkit"))
112+
lazy val elastic = project.in(file("elastic"))
120113
.configs(IntegrationTest)
121114
.settings(Defaults.itSettings/*, pbSettings*/)
122115
.enablePlugins(AkkaGrpcPlugin)
123116
.dependsOn(
124-
common % "compile->compile;test->test;it->it"
125-
)
126-
.dependsOn(
127-
commonTestkit % "compile->compile;test->test;it->it"
117+
core % "compile->compile;test->test;it->it"
128118
)
129119

130-
lazy val elastic = project.in(file("elastic"))
120+
lazy val elasticTestkit = project.in(file("elastic/testkit"))
131121
.configs(IntegrationTest)
132122
.settings(Defaults.itSettings/*, pbSettings*/)
133123
.enablePlugins(AkkaGrpcPlugin)
134124
.dependsOn(
135-
core % "compile->compile;test->test;it->it"
125+
elastic % "compile->compile;test->test;it->it"
136126
)
137127
.dependsOn(
138-
coreTestkit % "test->test;it->it"
128+
commonTestkit % "compile->compile;test->test;it->it"
139129
)
140130
.dependsOn(
141-
elasticTestkit % "test->test;it->it"
131+
coreTestkit % "compile->compile;test->test;it->it"
132+
)
133+
.dependsOn(
134+
jdbcTestkit % "compile->compile;test->test;it->it"
142135
)
143136

144137
lazy val kv = project.in(file("kv"))
@@ -159,11 +152,10 @@ lazy val root = project.in(file("."))
159152
core,
160153
coreTestkit,
161154
jdbc,
162-
schema,
163155
jdbcTestkit,
164156
counter,
165-
elasticTestkit,
166157
elastic,
158+
elasticTestkit,
167159
kv
168160
)
169161
.configs(IntegrationTest)
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package app.softnetwork
2+
3+
import java.io.InputStream
4+
import scala.io.Source
5+
import scala.language.implicitConversions
6+
7+
package object io {
8+
implicit def inputStreamToString(is: InputStream): String = Source.fromInputStream(is).mkString
9+
}

common/src/main/scala/app/softnetwork/serialization/JavaTimeSerializers.scala

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package app.softnetwork.serialization
22

33
import java.{time => jt}
4-
54
import org.json4s.CustomSerializer
65
import org.json4s.JsonAST.JString
76

@@ -13,9 +12,31 @@ import scala.util.Try
1312
* smanciot
1413
*/
1514
object JavaTimeSerializers {
16-
def all = Seq(LocalDateTimeISOSerializer, LocalDateISOSerializer, ZonedDateTimeISOSerializer)
15+
def all = Seq(
16+
LocalDateTimeISOSerializer,
17+
LocalDateISOSerializer,
18+
ZonedDateTimeISOSerializer,
19+
InstantISOSerializer
20+
)
1721
}
1822

23+
case object InstantISOSerializer
24+
extends CustomSerializer[jt.Instant](_ => {
25+
26+
val isoFormat = jt.format.DateTimeFormatter.ISO_DATE_TIME
27+
def isValidInstant(str: String): Boolean = Try(isoFormat.parse(str)).isSuccess
28+
29+
(
30+
{
31+
case JString(value) if isValidInstant(value) =>
32+
jt.LocalDateTime.parse(value, isoFormat).atZone(jt.ZoneId.systemDefault()).toInstant
33+
},
34+
{ case ldt: jt.Instant =>
35+
JString(jt.format.DateTimeFormatter.ISO_INSTANT.format(ldt))
36+
}
37+
)
38+
})
39+
1940
case object LocalDateTimeISOSerializer
2041
extends CustomSerializer[jt.LocalDateTime](_ => {
2142

common/src/main/scala/app/softnetwork/utils/ClasspathResources.scala

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,28 +16,29 @@
1616

1717
package app.softnetwork.utils
1818

19-
import com.typesafe.scalalogging.StrictLogging
19+
import app.softnetwork.io._
20+
import org.slf4j.{Logger, LoggerFactory}
2021

2122
import java.io.InputStream
22-
import scala.io.{Source => ScalaIOSource}
2323

24-
object ClasspathResources extends ClasspathResources
24+
object ClasspathResources extends ClasspathResources{
25+
lazy val log: Logger = LoggerFactory getLogger getClass.getName
26+
}
2527

26-
trait ClasspathResources extends StrictLogging {
28+
trait ClasspathResources {
2729

28-
def streamToString(is: InputStream): String = ScalaIOSource.fromInputStream(is).mkString
30+
def log: Logger
2931

30-
def fromClasspathAsString(fileName: String): String = streamToString(
31-
fromClasspathAsStream(fileName)
32-
)
32+
def fromClasspathAsString(file: String): Option[String] =
33+
fromClasspathAsStream(file).map(inputStreamToString)
3334

34-
def fromClasspathAsStream(fileName: String): InputStream = Option(
35-
getClass.getClassLoader.getResourceAsStream(fileName)
35+
def fromClasspathAsStream(file: String): Option[InputStream] = Option(
36+
getClass.getClassLoader.getResourceAsStream(file)
3637
) match {
37-
case Some(i) => i
38-
case _ =>
39-
logger.error(s"file $fileName not found in the classpath")
40-
null
38+
case None =>
39+
log.error(s"file $file not found in the classpath")
40+
None
41+
case some => some
4142
}
4243

4344
}

common/testkit/build.sbt

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,4 @@ val scalatest = Seq(
1010
"org.scalatest" %% "scalatest" % Versions.scalatest
1111
)
1212

13-
val dockerTestKit = Seq(
14-
"com.whisk" %% "docker-testkit-scalatest" % Versions.dockerTestKit exclude ("org.scalatest", "scalatest"),
15-
"com.whisk" %% "docker-testkit-impl-docker-java" % Versions.dockerTestKit exclude ("org.apache.httpcomponents", "httpclient"),
16-
"com.whisk" %% "docker-testkit-config" % Versions.dockerTestKit,
17-
"com.whisk" %% "docker-testkit-impl-spotify" % Versions.dockerTestKit
18-
)
19-
20-
libraryDependencies ++= scalatest ++ dockerTestKit
13+
libraryDependencies ++= scalatest

common/testkit/src/main/scala/app/softnetwork/concurrent/scalatest/CompletionTestKit.scala

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,19 @@
11
package app.softnetwork.concurrent.scalatest
22

33
import app.softnetwork.concurrent.Completion
4-
import com.typesafe.scalalogging.Logger
54
import org.scalatest._
6-
import org.slf4j.LoggerFactory
5+
import org.slf4j.Logger
76

87
import scala.concurrent.duration.Duration
98
import scala.concurrent.{Await, Future}
109
import scala.language.implicitConversions
1110
import scala.util.{Failure, Success, Try}
1211

12+
import scala.language.reflectiveCalls
13+
1314
/** Created by smanciot on 12/04/2021.
1415
*/
15-
trait CompletionTestKit extends Completion with Assertions {
16+
trait CompletionTestKit extends Completion with Assertions { _: { def log: Logger } =>
1617

1718
implicit class AwaitAssertion[T](future: Future[T])(implicit atMost: Duration = defaultTimeout) {
1819
def assert(fun: T => Assertion): Assertion =
@@ -33,8 +34,6 @@ trait CompletionTestKit extends Completion with Assertions {
3334

3435
override implicit def toSeq[T](t: Try[Seq[T]]): Seq[T] = toT[Seq[T]](t)
3536

36-
def log: Logger = Logger(LoggerFactory.getLogger(getClass.getName))
37-
3837
def blockUntil(explain: String, maxTries: Int = 20, sleep: Int = 1000)(
3938
predicate: () => Boolean
4039
): Unit = {

0 commit comments

Comments
 (0)