-
Notifications
You must be signed in to change notification settings - Fork 26
ethreceipts: improve interactions with flaky providers #188
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| l.chainID, err = getChainID(ctx, l.provider) | ||
| if err != nil { | ||
| l.chainID = big.NewInt(1) // assume mainnet in case of unlikely error | ||
| return fmt.Errorf("ethreceipts: failed to get chainID from provider: %w", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This assumption won't make things work (AsMessage will fail, etc.). We must fail here until removing or replacing AsMessage.
| return oks, nil | ||
| } | ||
|
|
||
| func (l *ReceiptsListener) searchFilterOnChain(ctx context.Context, subscriber *subscriber, filterers []Filterer) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From sequential search to concurrent search. This is bounded by MaxConcurrentSearchOnChainWorkers.
| chainID = id | ||
| return nil | ||
| }, nil, 1*time.Second, 2, 3) | ||
| }, nil, 1*time.Second, 2, 10) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is going to return an error, make sure we allow plenty of time for retrials.
| } | ||
|
|
||
| // max ~12s total wait time before giving up | ||
| br := breaker.New(log, 200*time.Millisecond, 1.2, 20) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The original breaker retried 4 times max starting in 1 second, this has a similar total waiting time, but we don't wait that much, the objective is to maximize retries. Note that each retry has a 4 second hard timeout, so the max total time in the worst case scenario would be considerably longer (in case we we hit the 4 sec timeout), but also note that we're doing concurrent transaction receipt fetches now, and we won't block subscriptions that don't need to be blocked.
5b28b8e to
3cb414a
Compare
simplify, clean-up
3aa00fc to
4ba6a34
Compare
| @@ -1,7 +1,9 @@ | |||
| package ethreceipts | |||
|
|
|||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactored to be used in concurrent scenarios
| @@ -1,13 +1,33 @@ | |||
| package ethreceipts | |||
|
|
|||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a mechanism to record and retry pending receipts. This allows receipts to be sent as soon as the flaky provider recovers.
ethreceipts/subscription.go
Outdated
|
|
||
| var _ Subscription = &subscriber{} | ||
|
|
||
| type pendingReceipt struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is the logic around "pending receipts" and why do we need it? (im not questioning it btw, its been a while since I worked on this package, just hoping you can summarize for me)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current logic:
- Client subscribes to receipt A
- Server recognizes receipt A exists (
matchFilters), and enqueues getting receipt details - Provider fails continuously, and
fetchTransactionReceiptnot fetch any data on time. - Client never gets details on receipt A, and there's no clear indication something is wrong.
The proposed fix:
- Client subscribes to receipt A
- Server recognizes receipt A exists (
matchFilters), and enqueues getting receipt details - Provider fails continuously, and
fetchTransactionReceiptdoes not fetch any data. - Server adds a pendingReceipt to a special queue, and retries periodically over an extended period of time.
Either client gets details on receipt A, or a clear error is logged, we might even force a connection close to indicate a major problem ocurred:ethkit/ethreceipts/subscription.go
Line 398 in 4ba6a34
if currentPending.attempts >= maxReceiptRetryAttempts {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see... when you say client and server.. you're referring to the internal pub-sub mechanism inside of ethreceipts..? because there is no remote client and server in that.. its just the app (consumer) subscribes to the listener/matcher/publisher, and once it finds the match, it publishes the value..?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
interesting.. so you're solving here for a node issue where the monitor knows the transaction happened (as the monitor has the logs of a txn), but we don't have the receipt, and the node is failing to give us the receipt..?
I also noticed in ethreceipts.Receipts{} we have a "log" and it says "// TODO is not used". Sounds like we should use the logger and we should log warnings / errors in such a situation, so we know if there are things breaking down that we're not handling well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problematic case looks like:
- A subscriber scans for the transaction hash
0xA - The underlying eth provider is healthy (monitor), and
matchFiltersfinds0xA, then usesfetchTransactionReceiptto get details of the receipt. - The underlying ethrpc provider starts failing, and
fetchTransactionReceiptultimately fails: https://github.com/0xsequence/ethkit/blob/master/ethreceipts/subscription.go#L126 - The subscribers nevers gets any information on transaction hash
0xA, even if the ethrpc provider recovers its health later on.
This test case demonstrates the problem better:
ethkit/ethreceipts/ethreceipts_test.go
Line 983 in 4ba6a34
| t.Run("Wait for txn receipts with a healthy monitor and a flaky provider", func(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR enhances the ethreceipts package to better handle flaky or unreliable RPC providers by implementing retry mechanisms, improved concurrency controls, and thread-safe HTTP client management. The changes allow the system to gracefully recover from transient provider failures while maintaining receipt delivery guarantees.
- Adds retry mechanism for failed receipt fetches with exponential backoff
- Implements thread-safe HTTP client swapping in the Provider
- Increases concurrency limits and adds bounded parallelism for on-chain searches
- Fixes potential deadlocks in filter registration
Reviewed Changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| ethrpc/option.go | Updated to use thread-safe SetHTTPClient method |
| ethrpc/ethrpc.go | Added mutex-protected HTTP client access with getter/setter methods |
| ethreceipts/subscription.go | Implemented pending receipt tracking and retry logic with exponential backoff; fixed filter registration deadlock |
| ethreceipts/filterer.go | Added atomic operations for thread-safe block number access and safe channel closure |
| ethreceipts/ethreceipts_test.go | Added comprehensive tests for flaky provider scenarios and deadlock detection |
| ethreceipts/ethreceipts.go | Improved error handling, increased concurrency, and parallelized on-chain searches |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
ethreceipts/subscription.go
Outdated
| // If context is cancelled, release the claim so the item can be retried later. | ||
| s.retryMu.Lock() | ||
| if current, ok := s.pendingReceipts[p.receipt.TransactionHash()]; ok && current == p { | ||
| current.nextRetryAt = time.Now() // Reschedule immediately. |
Copilot
AI
Oct 27, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rescheduling immediately on context cancellation could lead to busy-waiting. Consider using a small delay (e.g., 100ms) instead of time.Now() to avoid excessive retries when the system is shutting down.
| current.nextRetryAt = time.Now() // Reschedule immediately. | |
| // Reschedule with a small delay to avoid busy-waiting. | |
| current.nextRetryAt = time.Now().Add(100 * time.Millisecond) |
No description provided.