Skip to content

Conversation

@fmterrorf
Copy link
Contributor

@fmterrorf fmterrorf commented Sep 20, 2024

This adds the option of having batched event handlers to speed things up. Please refer to the included documentation for details.

This PR is mostly taken from #509

Related PRs
commanded/commanded-ecto-projections#54

1. devbox init
2. devbox add elixir@1.16.3
3. use direnv for loading devbox env

There is a .tool-versions but we got rid of asdf from our dev machines so setting this up instead in order to avoid each of us from having to setup asdf up on our machines again.
Co-authored-by: Cees de Groot <cg@cdegroot.com>
Skeleton code for batched handling, to be tested

Basic skeleton test

Ensure that no conflicting options/handlers are configured

Move the module compilation test into describe block

Add code to do error handling on batches (untested)

Add tests for batching error handling

Allow ack_events to take a list of events and acknowledge last event

batch handler telemetry

error handling fixes

Update support batch handler logic

Add batch handler telemetry tests

Align naming of batch handler test module

Update retry logic

Update tests

Add configuration test for ensuring batch_size and concurrency not test at same time

Fix handle_batch comparison and add test case

Add tests for handle_batch state and update last_event_seen logic

Remove list implementation for InMemory#ack_event/3

Minor update for event_handler_batch_state_test

Document handle_batch/2 for state update

Add more event handler test coverage

- upcast test for handle_batch/2
- batch_reset_event_handler_test

Do not retry if :skip is received

Commanded.options() -> Commanded.Application.options()

Allow skipping events for batched handler

Include commanded#493 and commanded#489 in CHANGELOG

Include commanded#493 and commanded#489 in CHANGELOG

Release v1.4.0

bugfix: retry command executing when the aggregator is down right before the execution

chore: improve typespec for router dispatch resp

Include commanded#494 in CHANGELOG

Remove duplicate event apply when receiving missed events published to aggregate's event stream

Update CHANGELOG

Use Erlang v25.0.4 and Elixir v1.14.0-otp-25

Require at least Elixir v1.10

Fix typespec typo in Commanded.Application

Use `:test` Mix env for GitHub workflow

To catch dialyzer and credo errors in test files.

Release v1.4.1

retry remaining batch when skipping event

only retry for batch

Reduce compile-time dependencies

Reformat

Cleanup some TODO comments that have been implemented

Remove unused code in test

Make Credo happy

Update docs for :skip return in error callback

Filter any already seen events from handle_batch

Update docs on event given to batch error hander
Format

Fix dialyzer error
* Pull in initial work by @davydog187.

* Redefine how acknowledgement works

Skeleton code for batched handling, to be tested

Basic skeleton test

Ensure that no conflicting options/handlers are configured

Move the module compilation test into describe block

Add code to do error handling on batches (untested)

Add tests for batching error handling

Allow ack_events to take a list of events and acknowledge last event

batch handler telemetry

error handling fixes

Update support batch handler logic

Add batch handler telemetry tests

Align naming of batch handler test module

Update retry logic

Update tests

Add configuration test for ensuring batch_size and concurrency not test at same time

Fix handle_batch comparison and add test case

Add tests for handle_batch state and update last_event_seen logic

Remove list implementation for InMemory#ack_event/3

Minor update for event_handler_batch_state_test

Document handle_batch/2 for state update

Add more event handler test coverage

- upcast test for handle_batch/2
- batch_reset_event_handler_test

Do not retry if :skip is received

Commanded.options() -> Commanded.Application.options()

Allow skipping events for batched handler

Include commanded#493 and commanded#489 in CHANGELOG

Include commanded#493 and commanded#489 in CHANGELOG

Release v1.4.0

bugfix: retry command executing when the aggregator is down right before the execution

chore: improve typespec for router dispatch resp

Include commanded#494 in CHANGELOG

Remove duplicate event apply when receiving missed events published to aggregate's event stream

Update CHANGELOG

Use Erlang v25.0.4 and Elixir v1.14.0-otp-25

Require at least Elixir v1.10

Fix typespec typo in Commanded.Application

Use `:test` Mix env for GitHub workflow

To catch dialyzer and credo errors in test files.

Release v1.4.1

retry remaining batch when skipping event

only retry for batch

Reduce compile-time dependencies

Reformat

Cleanup some TODO comments that have been implemented

Remove unused code in test

Make Credo happy

Update docs for :skip return in error callback

Filter any already seen events from handle_batch

Update docs on event given to batch error hander

* Drop support for {:error, reason, event}

Format

Fix dialyzer error

* Update docs

* Use delegate_event_to_handler & make confirm_receipt be more generic

* Do not retry :skip events

---------

Co-authored-by: Dave Lucia <davelucianyc@gmail.com>
Co-authored-by: Cees de Groot <cg@evrl.com>
@fmterrorf fmterrorf marked this pull request as ready for review September 20, 2024 21:42
@cdegroot
Copy link
Contributor

Note: review can wait until Calmwave has dogfooded this for a bit.

@yordis
Copy link
Contributor

yordis commented Nov 28, 2024

Any updates on this one?

Copy link
Contributor

@drteeth drteeth left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really great work, I think it's very close. I think it needs 3 things:

  1. The InMemory adapter needs to serve up batches of events
  2. The branch needs a reqbase against master
  3. Test suite to pass

as well.
TODO Batching: this holds true for the PostgreSQL implementation, needs to be added to in_memory and
verified for EventStoreDB.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the InMemory implementation is done. I think we can go ahead without EventStoreDB as I don't think it's going to see much use going forward.

Copy link
Contributor

@drteeth drteeth Dec 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to see the InMemory adapter implement batching as well before we merge. This is done.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @drteeth,

From giving a second read at the code, it seems like the InMemory handler already implements the desired behavior of "acknowledging" the previous events in a batch here: https://github.com/calmwave-open-source/commanded/blob/batching-support/lib/commanded/event_store/adapters/in_memory/subscriber.ex#L17-L20

Given that, could you perhaps clarify what further work would be needed? The tests have been fixed and the branch updated with the latest upstream version.

Thanks!
Carlos

@anderslemke
Copy link

I'm rooting for this 🫶

Copy link
Contributor

@drteeth drteeth left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good, just a little bit more to go.

@@ -0,0 +1,61 @@
{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we leave the devbox stuff out?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can remove it.

@callback error(
error :: term(),
failed_event :: domain_event,
failed_event :: domain_event | [domain_event] | nil,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I get that you will only get the next behaviour when opting in to batching, it may break code that is depending on failed_event to definitely be there as per the old contract.

Is there a way to avoid changing this contract? Would having a batch_error callback for this new path make sense?

Talk to me about what we could do here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could eliminate the nil with List.wrap/1 so at least there is only that difference to deal with?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the very least the custom error module case needs to be dealt with:

module.error(error, data, failure_context)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, we can do without the nil here.

assert match?(%{state: 0}, metadata)

# Make sure the event handler doesn't continue running logic after the test is done
Process.exit(handler, :kill)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are these needed? The handler is started with start_supervised, is there a reason that isn't enough to cleanup after the test?

Copy link
Contributor

@cblage cblage Jun 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was causing sometimes error logs in the tests, since it would continue processing events in the background for a second or two, and the Mock was only expecting 1 event.

telemetry_metadata = batch_telemetry_metadata(events, context, state)
start_time = telemetry_start(telemetry_metadata, :batch)

case delegate_event_to_handler(events, state) do
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that you are repeating the cases here from the single event path, should we consider trying to make this contract explicit somehow?

I'm thinking that these two branches will drift out of sync overtime, and be a source of bugs.

Is there a way to avoid this? My knee-jerk thoughts go to a protocol or behaviour that the handler depends on with implementations for single and batched paths?

Maybe that's over-thinking it, WDYT?

Copy link
Contributor

@cdegroot cdegroot Jun 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that the deeper issue is that we have too many return values to check for to begin with, and maybe this duplication just brings it out :-)

So my preference would be to let it slide, then think about better error handling, maybe collapse the cases down to a reasonable amount (2, ideally), then we can look at the code and see whether it's worth extracting.

@cdegroot cdegroot merged commit e8de0e6 into commanded:master Jun 25, 2025
@cdegroot cdegroot deleted the batching-support branch June 25, 2025 13:58
@badazz91
Copy link

hey @drteeth, @cdegroot - is there some documentation / guidelines on how to use this new feature? I would be really eager to try this out in our production app - if suitable :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

9 participants