Skip to content

Commit 7d9b2df

Browse files
committed
update documentation + update EventProcessorStream in order to skip events that the processor is not interrested with
1 parent ec8356e commit 7d9b2df

File tree

7 files changed

+214
-137
lines changed

7 files changed

+214
-137
lines changed

README.md

Lines changed: 14 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -337,39 +337,32 @@ The read side of CQRS is responsible for providing fast and efficient read acces
337337

338338
It can be implemented in various ways, but a common pattern is to use event sourcing. In this approach, the read side subscribes to the event stream published by the write side and processes the events to build and update the read models.
339339

340-
This approach is implemented in the framework through the "event Processor Stream"
340+
This approach is implemented in the framework through the "Event Processor Stream".
341341

342342
### Event Processor Stream
343343

344-
The "Event Processor Stream" implementation of the read side of CQRS subscribes to specific events based on their tags, which are published by the write side through event sourcing. By subscribing to these events, the read side can receive and process only the events that are relevant to its domain.
344+
The "Event Processor Stream" implementation of the read side of CQRS subscribes to specific events based on their **tags**, which are published by the write side through event sourcing. By subscribing to these events, the read side can receive and process only the events that are relevant to its domain.
345345

346-
To ensure that each event is read only once for each stream processor, the read side maintains an offset for each stream processor, which is a pointer to the last event that was processed by that processor. By storing this offset for each stream processor, the read side can resume processing events from the point where each stream processor left off, in case of failure or restart.
347-
348-
The read side uses the event tags to filter the event stream, so that only relevant events are processed. This allows the read side to update its models in real-time based on the events published by the write side, while avoiding duplication or missing events.
349-
350-
The offset handling for each stream processor is a critical component of the "Event Processor Stream" implementation, as it ensures that each event is processed only once for each stream processor. The read side updates the offset for each stream processor each time an event is processed, so that it can resume processing from the point where each stream processor left off in case of failure or restart.
351-
352-
Overall, the "Event Processor Stream" implementation of the read side of CQRS provides an efficient and scalable way to handle a large volume of events, while ensuring that each event is processed only once for each stream processor and that the read models remain up-to-date and consistent with the write side.
346+
When reading events from a stream, it is important to keep track of the position in the stream where the last event was read. This position is typically referred to as the "offset". The offset can be used to ensure that each event in the stream is read only once, even if the stream is read multiple times.
353347

354-
### Offset Provider
348+
To ensure that each event is read only once, the "Event Processor Stream" maintains an **offset** , which is a pointer to the last event that it **successfully** processed. By storing its offset, the processor can resume processing events from the point where it left off, in case of failure or restart. By keeping track of the offset, the "Event Stream Processor" _consumer_ can ensure that each event in the stream is read only once. If the application crashes or is restarted, it can resume reading from the last known offset, ensuring that no events are missed or duplicated.
355349

356-
When reading events from a stream, it is important to keep track of the position in the stream where the last event was read. This position is typically referred to as the "offset". The offset can be used to ensure that each event in the stream is read only once, even if the stream is read multiple times.
350+
Here's how the "Event Stream Processor" handles offsets:
357351

358-
Here's an example of how to use an offset to read an event just once from a specific event stream:
352+
1. **Initialize offset**: When the processor is (re)started, it queries the "Offset Provider" to retrieve its current offset corresponding to the last event it successfully processed in the event stream.
353+
2. **Read events**: The processor queries the "Journal Provider" for events that have a specific tag, starting from the current offset.
354+
3. **Process event**: The processor processes each event it read from the event stream.
355+
4. **Write offset**: Once the processor has successfully processed the event, it updates its offset with the latter's offset value.
359356

360-
1. Initialize offset: The first time the stream is read, the offset is set to the beginning of the stream.
361-
2. Read events: The application reads events from the event stream, starting from the current offset.
362-
3. Process event: The application processes each event it read.
363-
4. Update offset: Once the application has processed an event it read, it updates the offset to the position of the last event it read.
364-
5. Repeat: The application repeats the process of reading events, starting from the updated offset.
357+
Since distinct processors may read the same event stream, it is necessary to be able to uniquely distinguish their offset.
365358

366-
By keeping track of the offset, the application can ensure that each event in the stream is read only once. If the application crashes or is restarted, it can resume reading from the last known offset, ensuring that no events are missed or duplicated.
359+
Consequently, the "Event Processor Stream" is uniquely identified by a processor id and a specific tag.
367360

368-
In some event sourcing systems, the offset is stored along with the event data, making it easy to resume reading from the last known offset. In other systems, the offset is stored separately, for example in a database or distributed cache. It is this second approach that was chosen and implemented within the framework.
361+
![EventProcessorStream](docs/diagrams/out/EventProcessorStream.svg)
369362

370-
Overall, the use of an offset is a key technique in event sourcing systems, as it allows events to be read just once from a specific event stream, ensuring data consistency and avoiding duplicate processing.
363+
Overall, the "Event Processor Stream" implementation of the read side of CQRS provides an efficient and scalable way to handle a large volume of events, while ensuring that each event is processed only once for each stream processor and that the read models remain up-to-date and consistent with the write side.
371364

372-
### Journal Provider
365+
![CQRS-read-side](docs/diagrams/out/CQRS_read_side.svg)
373366

374367
## Integration with other subsystems
375368

build.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ ThisBuild / organization := "app.softnetwork"
3030

3131
name := "generic-persistence-api"
3232

33-
ThisBuild / version := "0.3.0"
33+
ThisBuild / version := "0.3.0.1"
3434

3535
ThisBuild / scalaVersion := "2.12.11"
3636

core/src/main/scala/app/softnetwork/persistence/query/EventProcessorStream.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -80,10 +80,8 @@ trait EventProcessorStream[E <: Event] extends EventStream {
8080
eventEnvelope.sequenceNr
8181
).map(_ => eventEnvelope.offset)
8282
case other =>
83-
log.error("Unexpected event [{}]", other)
84-
Future.failed(
85-
new IllegalArgumentException(s"Unexpected event [${other.getClass.getName}]")
86-
)
83+
log.warn("Skipped unexpected event [{}]", other)
84+
Future.successful(eventEnvelope.offset)
8785
}
8886
}
8987
}
Lines changed: 44 additions & 0 deletions
Loading

docs/diagrams/out/EventProcessorStream.svg

Lines changed: 85 additions & 53 deletions
Loading
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
@startuml CQRS-read-side
2+
hide footbox
3+
skinparam ParticipantPadding 20
4+
skinparam BoxPadding 10
5+
skinparam sequenceMessageAlign center
6+
skinparam monochrome true
7+
8+
box#AntiqueWhite
9+
boundary "CQRS write side" as ebc
10+
end box
11+
12+
database Journal as j
13+
database Offset as o
14+
participant "Event Processor Stream" as eps
15+
note right of eps : tag = 'domain-to-external'
16+
database "Query DB" as q
17+
18+
eps -> eps++: init
19+
eps -> o++: readOffset()
20+
return **offset**
21+
eps -> j: readEventsByTag(**tag**, **offset**)
22+
23+
loop **stream** of events with the tag "domain-to-external" \nstarting at the specified offset
24+
ebc -> j++: persist domain event with tag(s)\n Set("domain-to-external", ...)
25+
return
26+
27+
j -->> eps: eventEnvelope
28+
eps -> eps++: processEvent(eventEnvelope.event)
29+
eps -> q++ : write
30+
return
31+
eps --> o-- :writeOffset(eventEnvelope.offset)
32+
33+
end
34+
@enduml

0 commit comments

Comments
 (0)