Skip to content

Conversation

@jkawan
Copy link
Contributor

@jkawan jkawan commented Aug 17, 2025

  1. Added changes to not return error while closing the connection for chainsync, block-fetch and tx-submission protocol- made changes in client,server and respective protocol files.
  2. Added unit tests to test the change.

Closes #1112

Summary by CodeRabbit

Release Notes

  • New Features

    • Added protocol state inspection methods for checking terminal states.
  • Bug Fixes

    • Improved error handling and synchronization in protocol shutdown operations.
  • Tests

    • Added comprehensive test coverage for blockfetch, chainsync, and txsubmission protocols.
    • Enhanced test cleanup procedures for better goroutine synchronization.

✏️ Tip: You can customize this high-level summary in your review settings.

Jenita added 2 commits August 16, 2025 20:33
Signed-off-by: Jenita <jkawan@blinklabs.io>
Signed-off-by: Jenita <jkawan@blinklabs.io>
@jkawan jkawan requested a review from a team as a code owner August 17, 2025 01:43
Signed-off-by: Jenita <jkawan@blinklabs.io>
@jkawan
Copy link
Contributor Author

jkawan commented Aug 17, 2025

  1. Added changes to not return error while closing the connection for chainsync, block-fetch and tx-submission protocol- made changes in client,server and respective protocol files.
  2. Added unit tests to test the change.

Closes #1112

Jenita added 3 commits August 18, 2025 20:39
Signed-off-by: Jenita <jkawan@blinklabs.io>
Signed-off-by: Jenita <jkawan@blinklabs.io>
Signed-off-by: Jenita <jkawan@blinklabs.io>
Signed-off-by: Jenita <jkawan@blinklabs.io>
Signed-off-by: Jenita <jkawan@blinklabs.io>
@wolf31o2 wolf31o2 requested a review from agaffney September 20, 2025 14:02
Jenita added 2 commits October 20, 2025 14:11
Signed-off-by: Jenita <jkawan@blinklabs.io>
Signed-off-by: Jenita <jkawan@blinklabs.io>
@wolf31o2
Copy link
Member

@coderabbitai review

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Oct 26, 2025

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Oct 26, 2025

Note

Other AI code review bot(s) detected

CodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review.

📝 Walkthrough

Walkthrough

This PR implements protocol state tracking and conditional error handling to address closing connections when protocols are shut down. It adds IsDone() and GetDoneState() methods to track protocol terminal states, introduces a currentState field to the Protocol struct for state management, and modifies Stop() implementations in blockfetch and chainsync clients to conditionally send final messages only when protocols are active. New test suites validate protocol initialization, configuration, and message-sending behavior across blockfetch, chainsync, and txsubmission protocols. Connection tests are refactored to verify error handling differs when protocols are active versus stopped.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

  • Core logic changes across multiple protocol implementations require careful verification
  • New API methods (IsDone(), GetDoneState()) and currentState field tracking need validation for correctness with existing state machine
  • Multiple similar test files (blockfetch, chainsync, txsubmission) reduce individual review effort but cumulative coverage is significant
  • Conditional logic in Stop() methods affects error propagation semantics and warrants close inspection

Areas requiring extra attention:

  • Protocol.IsDone() state detection logic and its correctness against various protocol states
  • Interaction between currentState tracking and existing state machine in stateLoop
  • Conditional Stop() implementations in blockfetch/client.go and chainsync/client.go — verify state checks prevent message sends appropriately
  • Connection error handling behavior with active vs. stopped protocols in connection_test.go refactoring
  • Test cleanup delays (100ms sleeps in blockfetch/client_test.go and chainsync/client_test.go) — assess if timing is reliable

Possibly related PRs

  • #1233: Modifies protocol/protocol.go with currentState field and state-access methods, directly related to the state tracking infrastructure introduced here.

Pre-merge checks and finishing touches

❌ Failed checks (1 inconclusive)
Check name Status Explanation Resolution
Out of Scope Changes check ❓ Inconclusive While most changes are in-scope, minor formatting changes (blank lines in txsubmission/client.go and keepalive/client_test.go) and a 100ms sleep delay in test cleanup appear tangential to the core objective. Clarify if the sleep delays in blockfetch/client_test.go and chainsync/client_test.go are necessary for the feature or if they should be separated into a distinct refactoring PR.
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly and specifically describes the main change: preventing error returns on connection close for three specific protocols (chainsync, block-fetch, tx-submission) after proper shutdown.
Linked Issues check ✅ Passed The PR directly addresses issue #1112 by implementing the required functionality: preventing error returns when protocols are properly shut down before connection close across chainsync, block-fetch, and tx-submission.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feat/no-error-return

📜 Recent review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 90450f2 and 26c42c7.

📒 Files selected for processing (5)
  • connection_test.go (2 hunks)
  • protocol/blockfetch/client_test.go (1 hunks)
  • protocol/chainsync/client.go (1 hunks)
  • protocol/chainsync/client_test.go (1 hunks)
  • protocol/protocol.go (3 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • protocol/protocol.go
🧰 Additional context used
🧬 Code graph analysis (1)
protocol/chainsync/client.go (7)
protocol/keepalive/messages.go (1)
  • NewMsgDone (94-101)
protocol/localtxmonitor/messages.go (1)
  • NewMsgDone (79-86)
protocol/chainsync/messages.go (1)
  • NewMsgDone (269-276)
protocol/localstatequery/messages.go (1)
  • NewMsgDone (245-252)
protocol/peersharing/messages.go (1)
  • NewMsgDone (88-95)
protocol/localtxsubmission/messages.go (1)
  • NewMsgDone (118-125)
protocol/txsubmission/messages.go (1)
  • NewMsgDone (169-176)
🪛 GitHub Actions: go-test
protocol/blockfetch/client_test.go

[error] 103-103: TestGetBlock: found unexpected goroutines after test execution


[error] 103-103: TestGetBlockNoBlocks: found unexpected goroutines after test execution


[error] 104-104: TestGetCurrentTip: found unexpected goroutines after test execution


[error] 104-104: TestGetAvailableBlockRange: found unexpected goroutines after test execution

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: cubic · AI code reviewer
  • GitHub Check: Analyze (go)
🔇 Additional comments (2)
protocol/chainsync/client.go (1)

150-153: Idempotent Stop via IsDone guard looks correct

Conditioning the Done send on !c.IsDone() under busyMutex keeps Stop idempotent and avoids spurious Done messages after a terminal state, while still propagating any SendMessage error via the outer err. This aligns with the intended shutdown semantics.

connection_test.go (1)

315-353: Basic error-handling tests (DialFailure/DoubleClose) look good

The new TestBasicErrorHandling subtests correctly exercise dial failure and Close idempotency without overreaching into protocol behavior. They’re small, focused, and compatible with goleak when Close() behaves as intended.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 8

♻️ Duplicate comments (1)
protocol/protocol.go (1)

139-151: IsDone() behavior matches intended semantics

Treating AgencyNone and InitialState as done addresses idle/never‑started cases cleanly.

🧹 Nitpick comments (6)
protocol/blockfetch/client.go (1)

112-118: Consider locking Stop() like chainsync to avoid interleaving

Align with chainsync by guarding the Done send with busyMutex to avoid racing with GetBlock/GetBlockRange.

Apply:

 func (c *Client) Stop() error {
   var err error
   c.onceStop.Do(func() {
     c.Protocol.Logger().
       Debug("stopping client protocol",
         "component", "network",
         "protocol", ProtocolName,
         "connection_id", c.callbackContext.ConnectionId.String(),
       )
-    if !c.IsDone() {
+    c.busyMutex.Lock()
+    defer c.busyMutex.Unlock()
+    if !c.IsDone() {
       msg := NewMsgClientDone()
       if err = c.SendMessage(msg); err != nil {
         return
       }
     }
   })
   return err
 }
protocol/blockfetch/blockfetch_test.go (2)

51-59: Avoid Read() returning (0, nil); block or EOF to prevent busy spin

Returning 0, nil violates net.Conn expectations and can spin the muxer. Make Read block until close, then return EOF.

Apply:

-func (c *testConn) Read(b []byte) (n int, err error) { return 0, nil }
+func (c *testConn) Read(b []byte) (n int, err error) {
+  <-c.closeChan
+  return 0, io.EOF
+}

159-165: Increase timeout to reduce flakiness

100ms can be tight on CI. Consider 500ms–1s to avoid false negatives.

connection_test.go (2)

151-156: Typo: “stoppeds”.

Minor nit.

-		// Protocol is stoppeds
+		// Protocol is stopped

192-210: Prefer waiting for protocol init instead of fixed sleep.

Replace fixed sleep with the same polling used above to reduce flakes under CI load.

protocol/txsubmission/txsubmission_test.go (1)

154-167: Good message-send assertion; consider shorter timeout.

Looks fine; 2s is generous, but with a running muxer you can trim to 500ms.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between f00972e and 3e78cb5.

📒 Files selected for processing (9)
  • connection.go (2 hunks)
  • connection_test.go (2 hunks)
  • protocol/blockfetch/blockfetch_test.go (1 hunks)
  • protocol/blockfetch/client.go (1 hunks)
  • protocol/chainsync/chainsync_test.go (1 hunks)
  • protocol/chainsync/client.go (1 hunks)
  • protocol/protocol.go (8 hunks)
  • protocol/txsubmission/client.go (1 hunks)
  • protocol/txsubmission/txsubmission_test.go (1 hunks)
🧰 Additional context used
🪛 GitHub Actions: go-test
connection_test.go

[error] 74-74: TestErrorHandlingWithActiveProtocols: unexpected error when creating Connection object: handshake: timeout waiting on transition from protocol state Propose


[error] 132-132: TestErrorHandlingWithActiveProtocols: unexpected error when creating Connection object: handshake: timeout waiting on transition from protocol state Propose

🔇 Additional comments (6)
protocol/chainsync/client.go (1)

150-154: Good guard: only send Done when not already done

Prevents redundant Done and aligns Stop with protocol state. Locking via busyMutex is appropriate.

protocol/protocol.go (1)

491-540: State tracking and readiness signalling look solid

Locking around currentState updates/reads and using stateEntry for agency/timeout is correct. Timeout message uses CurrentState() safely.

If any protocol has multiple AgencyNone states, ensure StateMap defines exactly one terminal state; otherwise GetDoneState() can be ambiguous.

Also applies to: 550-551, 556-569, 592-595, 630-631

protocol/chainsync/chainsync_test.go (1)

151-166: Good coverage for client Done send.

Starting the muxer and asserting a write to the fake conn verifies the basic send path.

protocol/txsubmission/client.go (1)

86-92: No-op change.

Formatting-only; safe to merge.

protocol/txsubmission/txsubmission_test.go (2)

176-181: Server start/stop smoke test LGTM.

No issues.


16-29: Import nettest sync; prepare for safer conn behavior.

You’ll add sync for mutex/once; rest unchanged.

 import (
 	"io"
 	"log/slog"
 	"net"
+	"sync"
 	"testing"
 	"time"

Likely an incorrect or invalid review comment.

connection.go Outdated
Comment on lines 253 to 297
// handleConnectionError handles connection-level errors centrally
func (c *Connection) handleConnectionError(err error) error {
if err == nil {
return nil
}

// Only propagate EOF errors when acting as a client with active server-side protocols
if errors.Is(err, io.EOF) {
// Check if we have any active server-side protocols
if c.server {
return err
}

// For clients, only propagate EOF if we have active server protocols
hasActiveServerProtocols := false
if c.chainSync != nil && c.chainSync.Server != nil && !c.chainSync.Server.IsDone() {
hasActiveServerProtocols = true
}
if c.blockFetch != nil && c.blockFetch.Server != nil && !c.blockFetch.Server.IsDone() {
hasActiveServerProtocols = true
}
if c.txSubmission != nil && c.txSubmission.Server != nil && !c.txSubmission.Server.IsDone() {
hasActiveServerProtocols = true
}
if c.localStateQuery != nil && c.localStateQuery.Server != nil && !c.localStateQuery.Server.IsDone() {
hasActiveServerProtocols = true
}
if c.localTxMonitor != nil && c.localTxMonitor.Server != nil && !c.localTxMonitor.Server.IsDone() {
hasActiveServerProtocols = true
}
if c.localTxSubmission != nil && c.localTxSubmission.Server != nil && !c.localTxSubmission.Server.IsDone() {
hasActiveServerProtocols = true
}

if hasActiveServerProtocols {
return err
}

// EOF with no active server protocols is normal connection closure
return nil
}

// For non-EOF errors, always propagate
return err
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

EOF handling logic inverted for server/client; also consider ErrUnexpectedEOF

  • As server, we should NOT surface EOF if all server-side protocols (chainsync, block-fetch, tx-submission; plus local protocols in NtC) are done or never started. Current code always returns err when c.server is true.
  • As client, we should check our client-side protocols, not server-side, to decide whether EOF is unexpected.
  • Also handle io.ErrUnexpectedEOF (or ensure the muxer remaps it to io.EOF).

Proposed fix:

 func (c *Connection) handleConnectionError(err error) error {
   if err == nil {
     return nil
   }
-  // Only propagate EOF errors when acting as a client with active server-side protocols
-  if errors.Is(err, io.EOF) {
-    // Check if we have any active server-side protocols
-    if c.server {
-      return err
-    }
-
-    // For clients, only propagate EOF if we have active server protocols
-    hasActiveServerProtocols := false
-    if c.chainSync != nil && c.chainSync.Server != nil && !c.chainSync.Server.IsDone() {
-      hasActiveServerProtocols = true
-    }
-    if c.blockFetch != nil && c.blockFetch.Server != nil && !c.blockFetch.Server.IsDone() {
-      hasActiveServerProtocols = true
-    }
-    if c.txSubmission != nil && c.txSubmission.Server != nil && !c.txSubmission.Server.IsDone() {
-      hasActiveServerProtocols = true
-    }
-    if c.localStateQuery != nil && c.localStateQuery.Server != nil && !c.localStateQuery.Server.IsDone() {
-      hasActiveServerProtocols = true
-    }
-    if c.localTxMonitor != nil && c.localTxMonitor.Server != nil && !c.localTxMonitor.Server.IsDone() {
-      hasActiveServerProtocols = true
-    }
-    if c.localTxSubmission != nil && c.localTxSubmission.Server != nil && !c.localTxSubmission.Server.IsDone() {
-      hasActiveServerProtocols = true
-    }
-
-    if hasActiveServerProtocols {
-      return err
-    }
-
-    // EOF with no active server protocols is normal connection closure
-    return nil
-  }
+  // Treat EOF/UnexpectedEOF as connection closed, decide based on active protocols for our role
+  if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
+    hasActive := false
+    if c.server {
+      // Server: check server-side protocols
+      if c.chainSync != nil && c.chainSync.Server != nil && !c.chainSync.Server.IsDone() {
+        hasActive = true
+      }
+      if c.blockFetch != nil && c.blockFetch.Server != nil && !c.blockFetch.Server.IsDone() {
+        hasActive = true
+      }
+      if c.txSubmission != nil && c.txSubmission.Server != nil && !c.txSubmission.Server.IsDone() {
+        hasActive = true
+      }
+      if c.localStateQuery != nil && c.localStateQuery.Server != nil && !c.localStateQuery.Server.IsDone() {
+        hasActive = true
+      }
+      if c.localTxMonitor != nil && c.localTxMonitor.Server != nil && !c.localTxMonitor.Server.IsDone() {
+        hasActive = true
+      }
+      if c.localTxSubmission != nil && c.localTxSubmission.Server != nil && !c.localTxSubmission.Server.IsDone() {
+        hasActive = true
+      }
+    } else {
+      // Client: check client-side protocols
+      if c.chainSync != nil && c.chainSync.Client != nil && !c.chainSync.Client.IsDone() {
+        hasActive = true
+      }
+      if c.blockFetch != nil && c.blockFetch.Client != nil && !c.blockFetch.Client.IsDone() {
+        hasActive = true
+      }
+      if c.txSubmission != nil && c.txSubmission.Client != nil && !c.txSubmission.Client.IsDone() {
+        hasActive = true
+      }
+      if c.localStateQuery != nil && c.localStateQuery.Client != nil && !c.localStateQuery.Client.IsDone() {
+        hasActive = true
+      }
+      if c.localTxMonitor != nil && c.localTxMonitor.Client != nil && !c.localTxMonitor.Client.IsDone() {
+        hasActive = true
+      }
+      if c.localTxSubmission != nil && c.localTxSubmission.Client != nil && !c.localTxSubmission.Client.IsDone() {
+        hasActive = true
+      }
+    }
+    // Propagate only if a protocol is still active; otherwise swallow as normal closure
+    if hasActive {
+      return err
+    }
+    return nil
+  }
   // For non-EOF errors, always propagate
   return err
 }

To confirm muxer behavior for UnexpectedEOF remapping, run:

Also applies to: 335-347


🏁 Script executed:

#!/bin/bash
# Check if muxer remaps io.ErrUnexpectedEOF to io.EOF
rg -nP 'ErrUnexpectedEOF|UnexpectedEOF' muxer/ --context 2 || true

Length of output: 0






🏁 Script executed:

#!/bin/bash
# 1. Check if muxer directory exists and list structure
echo "=== Checking muxer directory ==="
fd --type d 'muxer' . || echo "No muxer directory found"

# 2. Search for EOF handling patterns more broadly in codebase
echo -e "\n=== Searching for EOF handling patterns ==="
rg -n 'io\.EOF|ErrUnexpectedEOF' --type go -B2 -A2 || echo "No EOF patterns found"

# 3. Verify current connection.go exists and show lines 253-297
echo -e "\n=== Verifying connection.go current state ==="
if [ -f connection.go ]; then
  sed -n '253,297p' connection.go
else
  echo "connection.go not found at root; searching for it..."
  fd --type f 'connection.go' . --exec head -20 {}
fi

Length of output: 5839


🌐 Web query:

Go io.ErrUnexpectedEOF error type documentation

💡 Result:

io.ErrUnexpectedEOF

  • What it is: a package-level error variable defined as
    var ErrUnexpectedEOF = errors.New("unexpected EOF")
    meaning EOF was encountered in the middle of reading a fixed-size block or structured data. [1]

  • Where it’s declared: package io in the Go standard library. [1]

  • When it’s returned: commonly by helpers that require reading an exact number of bytes (e.g. io.ReadFull, io.ReadAtLeast) and by packages that read fixed-size binary structures (e.g. encoding/binary) when EOF occurs after some but not all bytes were read. If no bytes were read, functions typically return io.EOF instead. [2][3]

  • How to check it: since Go 1.13 errors may be wrapped, use errors.Is(err, io.ErrUnexpectedEOF) to detect it robustly. (Older code sometimes used err == io.ErrUnexpectedEOF.) [4]

Sources:
[1] io package docs (ErrUnexpectedEOF definition).
[2] io.ReadFull / ReadAtLeast behavior (returns ErrUnexpectedEOF when partial read then EOF).
[3] encoding/binary.Read behavior (returns io.ErrUnexpectedEOF if EOF after some bytes).
[4] Go error-handling FAQ (use errors.Is for comparisons).



🏁 Script executed:

#!/bin/bash
# Get current state of connection.go lines 250-350 for full context
echo "=== git/connection.go lines 250-350 ==="
sed -n '250,350p' git/connection.go

Length of output: 100



🏁 Script executed:

#!/bin/bash
# Find connection.go and show lines 250-350
echo "=== Locating connection.go ==="
CONN_FILE=$(fd -t f 'connection\.go$' 2>/dev/null | head -1)
echo "Found at: $CONN_FILE"

if [ -n "$CONN_FILE" ]; then
  echo -e "\n=== $CONN_FILE lines 250-350 ==="
  sed -n '250,350p' "$CONN_FILE"
else
  echo "connection.go not found"
fi

Length of output: 3089



🏁 Script executed:

#!/bin/bash
# Check lines 335-347 for the "also applies to" section
CONN_FILE=$(fd -t f 'connection\.go$' 2>/dev/null | head -1)
if [ -n "$CONN_FILE" ]; then
  echo "=== $CONN_FILE lines 330-355 (context for 335-347) ==="
  sed -n '330,355p' "$CONN_FILE"
fi

Length of output: 802


Fix EOF handling logic: server/client role checks are inverted

The current code has the logic backwards:

  • Server (line 266): returns EOF unconditionally without checking active server-side protocols
  • Client (lines 271–287): checks server-side protocol status instead of client-side

This means servers always propagate EOF (even when protocols are active), and clients check the wrong role's protocols. The proposed fix correctly swaps the logic: server checks Server fields, client checks Client fields. Additionally, handle io.ErrUnexpectedEOF alongside io.EOF, as the protocol layer already uses it for partial reads.

Apply the proposed diff and ensure the muxer behavior aligns with the corrected logic.

🤖 Prompt for AI Agents
In connection.go around lines 253 to 297, the EOF handling logic is inverted and
misses io.ErrUnexpectedEOF; change the EOF check to treat both io.EOF and
io.ErrUnexpectedEOF the same, and swap the role-specific checks so that when
c.server is true you inspect the protocol .Server fields for active server-side
protocols, and when c.server is false (client) you inspect the protocol .Client
fields for active client-side protocols; if any active protocols are found
return the error, otherwise treat EOF/ErrUnexpectedEOF as normal closure and
return nil.

Signed-off-by: Jenita <jkawan@blinklabs.io>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

♻️ Duplicate comments (1)
connection_test.go (1)

225-231: NtN handshake: align roles and remove client–client pattern.

oConn is a client (no WithServer(true)), but mock is also ProtocolRoleClient and sends a handshake response. That’s client–client with the response from the wrong side and can stall the Propose state. This mirrors earlier feedback.

Fix one of the two ways; simplest here: keep oConn as client, make mock the server, and only send the NtN response:

-    mockConn := ouroboros_mock.NewConnection(
-        ouroboros_mock.ProtocolRoleClient,
+    mockConn := ouroboros_mock.NewConnection(
+        ouroboros_mock.ProtocolRoleServer,
         []ouroboros_mock.ConversationEntry{
-            ouroboros_mock.ConversationEntryHandshakeRequestGeneric,
-            ouroboros_mock.ConversationEntryHandshakeNtNResponse,
+            ouroboros_mock.ConversationEntryHandshakeNtNResponse,
         },
     )
🧹 Nitpick comments (2)
connection_test.go (2)

34-36: Reduce goleak false positives by ignoring known mock runner goroutine (optional).

If leaks persist after proper shutdown, ignore the mock runner top‑function to stabilize CI.

Example:

defer goleak.VerifyNone(t,
    goleak.IgnoreTopFunction("github.com/blinklabs-io/ouroboros-mock.(*Connection).run"),
)

Use only if cleanup is correct and a stable residual goroutine remains from the mock.

Also applies to: 223-229, 263-266


103-112: Avoid spin‑wait; replace with a deterministic wait helper.

Polling oConn.ChainSync() is flaky. Prefer a helper that blocks on a ready signal (e.g., from the callback barrier) or exposes a handshake‑finished channel.

I can provide a small test helper like waitForChainSyncServer(t, oConn, 1*time.Second) that uses a barrier instead of sleeps.

Also applies to: 185-194

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3e78cb5 and 29a9bd1.

📒 Files selected for processing (1)
  • connection_test.go (2 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
connection_test.go (8)
connection.go (2)
  • NewConnection (101-124)
  • New (127-129)
protocol/handshake/messages.go (2)
  • NewMsgProposeVersions (64-80)
  • NewMsgAcceptVersion (88-102)
protocol/versions.go (2)
  • ProtocolVersionMap (24-24)
  • ProtocolVersionNtCOffset (20-20)
protocol/versiondata.go (1)
  • VersionDataNtC9to14 (48-48)
protocol/chainsync/messages.go (1)
  • NewMsgFindIntersect (224-232)
protocol/chainsync/chainsync.go (4)
  • ProtocolIdNtC (31-31)
  • New (232-240)
  • NewConfig (246-261)
  • ChainSync (194-197)
protocol/common/types.go (1)
  • Point (23-28)
connection_options.go (5)
  • WithConnection (36-40)
  • WithNetworkMagic (50-54)
  • WithServer (64-68)
  • WithChainSyncConfig (131-135)
  • WithNodeToNode (78-82)
🪛 GitHub Actions: go-test
connection_test.go

[error] 205-219: TestErrorHandlingWithActiveProtocols: timed out waiting for protocol to stop. Found large set of unexpected goroutines traces in the test output.


[error] 254-260: TestErrorHandlingWithMultipleProtocols: Received connection error with multiple active protocols: EOF. Unexpected goroutines reported in test.


[error] 300-300: TestBasicErrorHandling: Unexpected goroutines reported during test teardown (goroutine leaks detected).

Signed-off-by: Jenita <jkawan@blinklabs.io>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
protocol/chainsync/client.go (1)

122-137: Fix panic: send on closed channel in ChainSync client

readyForNextBlockChan is closed on Done, but handleRollForward/Backward can still send, causing “send on closed channel” (seen in CI). Don’t close the channel; instead, make syncLoop exit on Done.

Apply:

@@ func (c *Client) Start() {
-        go func() {
-            <-c.DoneChan()
-            close(c.readyForNextBlockChan)
-        }()
+        go func() {
+            <-c.DoneChan()
+            // No channel close; syncLoop exits on Done
+        }()
@@ func (c *Client) syncLoop() {
-    for {
-        // Wait for a block to be received
-        if ready, ok := <-c.readyForNextBlockChan; !ok {
-            // Channel is closed, which means we're shutting down
-            return
-        } else if !ready {
-            // Sync was cancelled
-            return
-        }
+    for {
+        // Wait for a block to be received or shutdown
+        var ready bool
+        select {
+        case <-c.DoneChan():
+            return
+        case ready = <-c.readyForNextBlockChan:
+            // continue
+        }
+        if !ready {
+            // Sync was cancelled
+            return
+        }
@@ func (c *Client) handleRollForward(...) error {
-    // Signal that we're ready for the next block
-    c.readyForNextBlockChan <- true
+    // Signal that we're ready for the next block
+    c.readyForNextBlockChan <- true
     return nil
@@ func (c *Client) handleRollBackward(msg protocol.Message) error {
-    // Signal that we're ready for the next block
-    c.readyForNextBlockChan <- true
+    // Signal that we're ready for the next block
+    c.readyForNextBlockChan <- true
     return nil

Note: Since we no longer close the channel, no additional guards are required at send sites; syncLoop exits via Done.

Also applies to: 440-456, 724-736, 765-767

♻️ Duplicate comments (4)
connection_test.go (3)

207-214: Tighten assertion: any error after protocols stopped should fail

When protocols are stopped, closing the connection must not surface an error. Fail the test if any error is received instead of logging.

-        select {
-        case err := <-oConn.ErrorChan():
-            t.Logf("Received error during shutdown: %s", err)
-        case <-time.After(500 * time.Millisecond):
-            t.Log("No connection error received (expected when protocols are stopped)")
-        }
+        select {
+        case err := <-oConn.ErrorChan():
+            t.Fatalf("unexpected connection error after protocols stopped: %v", err)
+        case <-time.After(500 * time.Millisecond):
+            // OK: no error
+        }

224-256: Fix NtN handshake role mismatch and test semantics; expect no error when no traffic

The mock sends a handshake response while oConn is not configured as server, causing client–client stalemate risk. Also, this test asserts an error without exchanging any mini‑protocol messages, which contradicts the PR behavior (“started but no messages exchanged” → no error).

Apply:

 mockConn := ouroboros_mock.NewConnection(
     ouroboros_mock.ProtocolRoleClient,
     []ouroboros_mock.ConversationEntry{
-        ouroboros_mock.ConversationEntryHandshakeRequestGeneric,
-        ouroboros_mock.ConversationEntryHandshakeNtNResponse,
+        // Client proposes; server (our oConn) will respond
+        ouroboros_mock.ConversationEntryHandshakeRequestGeneric,
     },
 )

 oConn, err := ouroboros.New(
     ouroboros.WithConnection(mockConn),
     ouroboros.WithNetworkMagic(ouroboros_mock.MockNetworkMagic),
-    ouroboros.WithNodeToNode(true),
+    ouroboros.WithNodeToNode(true),
+    ouroboros.WithServer(true),
 )
@@
-// Wait for handshake to complete
-time.Sleep(100 * time.Millisecond)
+// Optionally wait briefly or add a handshake barrier if available
+time.Sleep(100 * time.Millisecond)
@@
-// Should receive error since protocols are active
-select {
-case err := <-oConn.ErrorChan():
-    if err == nil {
-        t.Fatal("expected connection error, got nil")
-    }
-    t.Logf("Received connection error with multiple active protocols: %s", err)
-case <-time.After(2 * time.Second):
-    t.Error("timed out waiting for connection error")
-}
+// No mini‑protocol traffic: closing should not emit an error
+select {
+case err := <-oConn.ErrorChan():
+    t.Fatalf("unexpected error with no active protocol traffic: %v", err)
+case <-time.After(750 * time.Millisecond):
+    // OK: no error
+}

199-205: Typo in comment

“stoppeds” → “stopped”.

-        // Protocol is stoppeds
+        // Protocol is stopped
connection.go (1)

256-300: Fix inverted EOF handling logic and include ErrUnexpectedEOF as connection closure

The logic for determining when to propagate EOF is backwards. When acting as a server (c.server=true), the code unconditionally returns the error instead of checking if server-side protocols are active. Conversely, when acting as a client, it checks server-side protocol states instead of client-side. This inverts the intended behavior and will suppress errors that should propagate.

Additionally, io.ErrUnexpectedEOF should be treated alongside io.EOF as a normal connection closure signal, since both indicate the connection is closed (protocol.go currently handles it as an incomplete message state, but at the connection level it signals closure).

Apply:

 func (c *Connection) handleConnectionError(err error) error {
   if err == nil {
     return nil
   }
-  // Only propagate EOF errors when acting as a client with active server-side protocols
-  if errors.Is(err, io.EOF) {
-    // Check if we have any active server-side protocols
-    if c.server {
-      return err
-    }
-
-    // For clients, only propagate EOF if we have active server protocols
-    hasActiveServerProtocols := false
-    if c.chainSync != nil && c.chainSync.Server != nil && !c.chainSync.Server.IsDone() {
-      hasActiveServerProtocols = true
-    }
-    if c.blockFetch != nil && c.blockFetch.Server != nil && !c.blockFetch.Server.IsDone() {
-      hasActiveServerProtocols = true
-    }
-    if c.txSubmission != nil && c.txSubmission.Server != nil && !c.txSubmission.Server.IsDone() {
-      hasActiveServerProtocols = true
-    }
-    if c.localStateQuery != nil && c.localStateQuery.Server != nil && !c.localStateQuery.Server.IsDone() {
-      hasActiveServerProtocols = true
-    }
-    if c.localTxMonitor != nil && c.localTxMonitor.Server != nil && !c.localTxMonitor.Server.IsDone() {
-      hasActiveServerProtocols = true
-    }
-    if c.localTxSubmission != nil && c.localTxSubmission.Server != nil && !c.localTxSubmission.Server.IsDone() {
-      hasActiveServerProtocols = true
-    }
-
-    if hasActiveServerProtocols {
-      return err
-    }
-
-    // EOF with no active server protocols is normal connection closure
-    return nil
-  }
+  // Treat EOF/UnexpectedEOF as normal closure unless our role still has active protocols
+  if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
+    hasActive := false
+    if c.server {
+      // Server: check server-side protocols
+      if c.chainSync != nil && c.chainSync.Server != nil && !c.chainSync.Server.IsDone() { hasActive = true }
+      if c.blockFetch != nil && c.blockFetch.Server != nil && !c.blockFetch.Server.IsDone() { hasActive = true }
+      if c.txSubmission != nil && c.txSubmission.Server != nil && !c.txSubmission.Server.IsDone() { hasActive = true }
+      if c.localStateQuery != nil && c.localStateQuery.Server != nil && !c.localStateQuery.Server.IsDone() { hasActive = true }
+      if c.localTxMonitor != nil && c.localTxMonitor.Server != nil && !c.localTxMonitor.Server.IsDone() { hasActive = true }
+      if c.localTxSubmission != nil && c.localTxSubmission.Server != nil && !c.localTxSubmission.Server.IsDone() { hasActive = true }
+    } else {
+      // Client: check client-side protocols
+      if c.chainSync != nil && c.chainSync.Client != nil && !c.chainSync.Client.IsDone() { hasActive = true }
+      if c.blockFetch != nil && c.blockFetch.Client != nil && !c.blockFetch.Client.IsDone() { hasActive = true }
+      if c.txSubmission != nil && c.txSubmission.Client != nil && !c.txSubmission.Client.IsDone() { hasActive = true }
+      if c.localStateQuery != nil && c.localStateQuery.Client != nil && !c.localStateQuery.Client.IsDone() { hasActive = true }
+      if c.localTxMonitor != nil && c.localTxMonitor.Client != nil && !c.localTxMonitor.Client.IsDone() { hasActive = true }
+      if c.localTxSubmission != nil && c.localTxSubmission.Client != nil && !c.localTxSubmission.Client.IsDone() { hasActive = true }
+    }
+    if hasActive {
+      return err
+    }
+    return nil
+  }
   // For non-EOF errors, always propagate
   return err
 }
🧹 Nitpick comments (1)
connection_test.go (1)

117-119: Avoid sleep-based readiness; use a barrier from the callback

Replace time.Sleep with a readiness signal from the FindIntersect callback to make the test deterministic and goleak-safe.

Example:

-        // Wait a bit for protocol to start
-        time.Sleep(100 * time.Millisecond)
+        // Barrier to ensure callback entered
+        started := make(chan struct{}, 1)
+        // pass via config context or test-scoped var; e.g., wrap FindIntersectFunc to close(started)
+        // then here:
+        select {
+        case <-started:
+        case <-time.After(1 * time.Second):
+            t.Fatal("timeout waiting for ChainSync to start")
+        }
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 29a9bd1 and 728d9b1.

📒 Files selected for processing (6)
  • connection.go (5 hunks)
  • connection_test.go (2 hunks)
  • protocol/chainsync/chainsync.go (2 hunks)
  • protocol/chainsync/client.go (2 hunks)
  • protocol/chainsync/server.go (1 hunks)
  • protocol/protocol.go (9 hunks)
🧰 Additional context used
🧬 Code graph analysis (4)
connection_test.go (6)
connection.go (2)
  • NewConnection (104-127)
  • New (130-132)
protocol/protocol.go (2)
  • ProtocolRoleClient (91-91)
  • New (121-132)
protocol/chainsync/messages.go (1)
  • NewMsgFindIntersect (224-232)
protocol/chainsync/chainsync.go (6)
  • ProtocolIdNtC (32-32)
  • New (234-242)
  • NewConfig (248-263)
  • WithFindIntersectFunc (291-297)
  • CallbackContext (214-219)
  • ChainSync (195-198)
protocol/common/types.go (1)
  • Point (23-28)
connection_options.go (5)
  • WithConnection (36-40)
  • WithNetworkMagic (50-54)
  • WithServer (64-68)
  • WithChainSyncConfig (131-135)
  • WithNodeToNode (78-82)
connection.go (3)
protocol/chainsync/server.go (1)
  • Server (27-33)
muxer/muxer.go (2)
  • ConnectionClosedError (72-75)
  • Muxer (57-70)
protocol/protocol.go (1)
  • ProtocolOptions (96-106)
protocol/protocol.go (2)
protocol/state.go (3)
  • State (32-35)
  • StateMap (69-69)
  • AgencyNone (26-26)
protocol/message.go (1)
  • Message (18-22)
protocol/chainsync/client.go (1)
protocol/chainsync/messages.go (1)
  • NewMsgDone (270-277)
🪛 GitHub Actions: go-test
connection.go

[error] 81-81: panic: send on closed channel

🔇 Additional comments (2)
protocol/protocol.go (1)

134-153: State tracking implementation looks good

currentState with RW lock, CurrentState()/IsDone() (including initial state) and guarded updates in stateLoop align with lifecycle checks needed by centralized error handling.

Also applies to: 493-503, 552-571, 592-599

protocol/chainsync/chainsync.go (1)

18-26: Context propagation in callbacks looks good

Embedding context.Context in CallbackContext enables clean cancellation and aligns with the connection’s context wiring.

Also applies to: 213-219

@wolf31o2
Copy link
Member

I'm not sure if it's helpful, but I have some (abandoned) code on a similar path here: #1244

Signed-off-by: Jenita <jkawan@blinklabs.io>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
connection_test.go (1)

254-307: Test contradicts PR objectives—expects error when no protocol messages exchanged.

Per the PR objectives from issue #1112, "Apply the same non-error behavior when the protocols have been started but no messages were exchanged before the connection close."

This test only performs a handshake (lines 261-263) with no mini-protocol traffic, yet expects to receive an error (lines 281-303). According to the PR requirements, closing after handshake-only should NOT return an error.

Apply this diff to align the test with PR semantics:

-	// Should receive error since protocols were active
+	// Should NOT receive error since no mini-protocol messages were exchanged
 	timeout := time.After(2 * time.Second)
 	for {
 		select {
 		case err, ok := <-oConn.ErrorChan():
 			if !ok {
 				t.Log("Error channel closed")
 				goto done
 			}
 			if err == nil {
-				t.Error("received nil error")
-				continue
+				continue // Ignore nil errors
+			}
+			if err != nil {
+				t.Fatalf("unexpected error with no protocol traffic: %v", err)
 			}
-			t.Logf("Received connection error with multiple active protocols: %s", err)
-			if strings.Contains(err.Error(), "EOF") ||
-				strings.Contains(err.Error(), "use of closed network connection") {
-				goto done
-			}
+		case <-time.After(500 * time.Millisecond):
+			// OK: no error received
+			goto done
 		case <-timeout:
-			t.Error("timed out waiting for connection error")
+			t.Fatal("test timeout")
 			goto done
 		}
 	}
🧹 Nitpick comments (1)
connection_test.go (1)

118-119: Replace arbitrary sleep with protocol state check.

The 100ms sleep at line 119 is arbitrary and could cause flakiness. Consider polling the protocol's current state or using the mock's synchronization mechanisms to confirm the ChainSync FindIntersect has been processed.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 728d9b1 and 6163d32.

📒 Files selected for processing (1)
  • connection_test.go (2 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
connection_test.go (4)
protocol/handshake/messages.go (3)
  • NewMsgProposeVersions (64-80)
  • NewMsgFromCbor (39-57)
  • NewMsgAcceptVersion (88-102)
protocol/versiondata.go (1)
  • VersionDataNtC9to14 (48-48)
protocol/chainsync/messages.go (3)
  • NewMsgFromCbor (48-84)
  • NewMsgFindIntersect (224-232)
  • NewMsgDone (270-277)
protocol/common/types.go (1)
  • Point (23-28)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: cubic · AI code reviewer
🔇 Additional comments (1)
connection_test.go (1)

309-347: LGTM!

Both subtests are straightforward and correctly validate basic error handling scenarios: dial failure and idempotent close operations.

Signed-off-by: jkawan <kawanjenita@outlook.com>
Signed-off-by: Jenita <jkawan@blinklabs.io>
Copy link

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3 issues found across 11 files

Prompt for AI agents (all 3 issues)

Understand the root cause of the following 3 issues and fix them.


<file name="connection_test.go">

<violation number="1" location="connection_test.go:130">
Fail the test instead of just logging when ErrorChan closes before any error is received so the expected connection error is actually enforced.</violation>
</file>

<file name="connection.go">

<violation number="1" location="connection.go:352">
Always call `Close()` when the muxer reports connection termination; otherwise normal EOFs leave the connection running and leak resources.</violation>
</file>

<file name="protocol/protocol.go">

<violation number="1" location="protocol/protocol.go:151">
`IsDone` now reports “done” whenever the protocol is merely back in its initial state, so clients no longer send the required Done messages during `Stop()`, leaving the remote side hanging.</violation>
</file>

Reply to cubic to teach it or ask questions. Re-run a review with @cubic-dev-ai review this PR

Copy link

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

5 issues found across 11 files

Prompt for AI agents (all 5 issues)

Understand the root cause of the following 5 issues and fix them.


<file name="protocol/protocol.go">

<violation number="1" location="protocol/protocol.go:142">
`CurrentState` reads the shared state under a brand-new `stateMutex`, but state transitions still write under `currentStateMu`, so reads and writes are unsynchronized and can race.</violation>
</file>

<file name="connection.go">

<violation number="1" location="connection.go:368">
Graceful remote disconnects no longer trigger Close(), leaving doneChan and connection resources hanging because handleConnectionError suppresses io.EOF.
Move the Close() call outside the handledErr check (or otherwise ensure it is invoked even when handleConnectionError returns nil) so shutdown still runs on remote EOF.</violation>
</file>

<file name="protocol/blockfetch/blockfetch_test.go">

<violation number="1" location="protocol/blockfetch/blockfetch_test.go:51">
`testConn.Read` returns `(0, nil)`, violating the `io.Reader` contract and causing muxer read loops to spin forever instead of seeing EOF on close.</violation>
</file>

<file name="protocol/chainsync/chainsync_test.go">

<violation number="1" location="protocol/chainsync/chainsync_test.go:49">
testConn.Read returns 0 bytes with nil error, causing the muxer read loop to spin forever instead of blocking like a real net.Conn.</violation>
</file>

<file name="connection_test.go">

<violation number="1" location="connection_test.go:86">
The &quot;ErrorsIgnoredWhenProtocolsStopped&quot; sub-test also runs the connection as a server, so it cannot observe the client-side error suppression behavior it purports to validate.</violation>
</file>

Reply to cubic to teach it or ask questions. Re-run a review with @cubic-dev-ai review this PR

Copy link

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

4 issues found across 11 files

Prompt for AI agents (all 4 issues)

Understand the root cause of the following 4 issues and fix them.


<file name="protocol/blockfetch/blockfetch_test.go">

<violation number="1" location="protocol/blockfetch/blockfetch_test.go:51">
`testConn.Read` returns 0 bytes with a nil error, violating the `io.Reader` contract and causing the mux read loop to busy-spin.</violation>
</file>

<file name="connection.go">

<violation number="1" location="connection.go:368">
A normal remote EOF (handledErr == nil) no longer triggers Close(), so the connection never shuts down and goroutines leak. Close the connection even when the EOF is being suppressed.</violation>
</file>

<file name="protocol/txsubmission/txsubmission_test.go">

<violation number="1" location="protocol/txsubmission/txsubmission_test.go:51">
`testConn.Read` returns 0 bytes with a nil error, violating the `io.Reader` contract and causing callers to spin endlessly.</violation>
</file>

<file name="protocol/chainsync/chainsync_test.go">

<violation number="1" location="protocol/chainsync/chainsync_test.go:49">
testConn.Read must not return (0, nil) because it makes muxer.Start spin and violates the io.Reader contract.</violation>
</file>

Reply to cubic to teach it or ask questions. Re-run a review with @cubic-dev-ai review this PR

}
}

func (c *testConn) Read(b []byte) (n int, err error) { return 0, nil }
Copy link

@cubic-dev-ai cubic-dev-ai bot Nov 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

testConn.Read must not return (0, nil) because it makes muxer.Start spin and violates the io.Reader contract.

Prompt for AI agents
Address the following comment on protocol/chainsync/chainsync_test.go at line 49:

<comment>testConn.Read must not return (0, nil) because it makes muxer.Start spin and violates the io.Reader contract.</comment>

<file context>
@@ -0,0 +1,167 @@
+	}
+}
+
+func (c *testConn) Read(b []byte) (n int, err error) { return 0, nil }
+func (c *testConn) Write(b []byte) (n int, err error) {
+	select {
</file context>
Fix with Cubic

Signed-off-by: Jenita <jkawan@blinklabs.io>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
connection_test.go (1)

227-249: Test logic may not align with PR objectives.

Per the PR description and linked issue #1112, closing the connection after protocols are explicitly stopped should NOT return any error. However, this test still accepts EOF and use of closed network connection errors (lines 237-240) rather than failing on any error.

If the goal is to verify that no errors are propagated when protocols are stopped, consider failing on any received error:

-			if err != nil {
-				if !strings.Contains(err.Error(), "EOF") &&
-					!strings.Contains(err.Error(), "use of closed network connection") {
-					t.Errorf("Unexpected error during shutdown: %s", err)
-				}
-			}
+			if err != nil {
+				t.Fatalf("unexpected error after protocols stopped: %v", err)
+			}

If certain connection-level errors are still expected and acceptable after protocol shutdown, please document this distinction explicitly.

🧹 Nitpick comments (3)
protocol/protocol.go (2)

112-116: Unused field stateRespChan is always nil.

The new stateRespChan field is never populated—transitionState() at line 697 always passes nil. If this is intentional scaffolding for future use, consider adding a brief comment. Otherwise, remove it until needed.


138-156: Approve with note on duplicate accessor.

The CurrentState() implementation correctly uses currentStateMu for thread-safe access, addressing the race condition concern from previous reviews.

Note: There's also a private getCurrentState() at lines 247-251 with identical logic. Consider consolidating to reduce duplication.

connection_test.go (1)

128-146: Consider failing when error channel closes unexpectedly.

When !ok at line 129, the test logs and exits. Per previous review feedback, if the channel closes before receiving the expected error, this could silently pass when it should fail.

 			case err, ok := <-oConn.ErrorChan():
 				if !ok {
-					t.Log("Error channel closed")
+					t.Error("Error channel closed before receiving expected error")
 					goto done
 				}
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between f6b9134 and aca6320.

📒 Files selected for processing (2)
  • connection_test.go (2 hunks)
  • protocol/protocol.go (6 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
protocol/protocol.go (1)
protocol/state.go (3)
  • State (32-35)
  • StateMap (70-70)
  • AgencyNone (26-26)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: cubic · AI code reviewer
🔇 Additional comments (8)
protocol/protocol.go (5)

18-31: LGTM!

The new context import is required to support the Context field in ProtocolOptions.


40-61: LGTM!

Adding currentState as a struct field addresses the previous review feedback and enables thread-safe access via the existing currentStateMu mutex.


99-110: LGTM!

Adding the Context field to ProtocolOptions enables connection-level context propagation to protocols.


695-700: LGTM!

The transitionState function correctly initializes the transition struct with the error channel and leaves stateRespChan as nil (consistent with current usage).


158-166: Function GetDoneState() is unused/dead code; the identified concern about non-determinism is technically valid but practically moot.

Verification confirms:

  • GetDoneState() has zero call sites in the entire codebase (including tests)
  • Each protocol state map contains exactly one state with AgencyNone (the done state), confirming the design assumption
  • The non-deterministic map iteration concern is technically valid but irrelevant since the function is never called

The code is correct as-is. Consider removing GetDoneState() as dead code if it's not part of a planned public API expansion.

connection_test.go (3)

36-81: LGTM!

The test correctly sets up the role pairing (mock as client, oConn as server) and uses explicit handshake messages. The callback now blocks on ctx.Done() instead of time.Sleep(), addressing previous feedback.


315-353: LGTM!

The basic error handling tests for dial failure and double close are straightforward and properly clean up resources.


258-313: Incorrect review comment – protocols auto-start after handshake in this test.

The test correctly expects errors. NtN protocols auto-start after handshake (controlled by delayProtocolStart, which defaults to false). With WithNodeToNode(true) and no server configuration, the client-side protocols (blockFetch, chainSync, txSubmission, leiosNotify, leiosFetch, etc.) automatically start at line 476 during the handshake callback. These started protocols are "active," so expecting errors when the connection closes is correct—no additional protocol messages are needed for them to be active.

Likely an incorrect or invalid review comment.

jkawan and others added 2 commits November 24, 2025 18:11
Signed-off-by: jkawan <kawanjenita@outlook.com>
Signed-off-by: Jenita <jkawan@blinklabs.io>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (5)
protocol/localstatequery/client_test.go (2)

115-117: Consider more deterministic synchronization.

Using time.Sleep for test synchronization can make tests slower and non-deterministic. While this approach currently works (no goleak failures reported), it adds technical debt.

Consider alternatives such as:

  • Exposing a shutdown completion channel from the mock
  • Using sync.WaitGroup to track goroutine lifecycle
  • Polling with a timeout for a specific state condition

129-131: Similar synchronization concern.

Same timing-based synchronization issue as the earlier sleep. While functional, deterministic shutdown signaling would be more robust.

protocol/localtxmonitor/client_test.go (1)

96-98: Timing-based synchronization in tests.

Similar to other protocol tests, these sleeps are a timing workaround. While tests pass, deterministic synchronization would be more maintainable.

Also applies to: 110-112

protocol/localtxsubmission/client_test.go (1)

89-91: Timing delays for test synchronization.

These sleeps work for this test suite but represent the same technical debt pattern seen across other protocol tests.

Also applies to: 102-104

protocol/keepalive/client_test.go (1)

118-119: Multiple instances of timing-based synchronization.

All three test functions use the same sleep-based teardown pattern. While tests pass, this multiplies the technical debt and increases total test execution time.

Consider extracting the teardown logic into a helper that uses deterministic synchronization, reducing duplication and improving test reliability.

Also applies to: 131-133, 184-186, 198-200, 243-245, 257-259

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between aca6320 and c2e7504.

📒 Files selected for processing (6)
  • protocol/blockfetch/client_test.go (2 hunks)
  • protocol/chainsync/client_test.go (2 hunks)
  • protocol/keepalive/client_test.go (6 hunks)
  • protocol/localstatequery/client_test.go (2 hunks)
  • protocol/localtxmonitor/client_test.go (2 hunks)
  • protocol/localtxsubmission/client_test.go (2 hunks)
🧰 Additional context used
🪛 GitHub Actions: go-test
protocol/blockfetch/client_test.go

[error] 51-51: TestGetBlock: found unexpected goroutines (see test stack trace in output)


[error] 108-109: TestGetBlock and related tests failed: found unexpected goroutines during test execution

protocol/chainsync/client_test.go

[error] 109-109: TestGetCurrentTip: found unexpected goroutines. Command: 'go test ./...'.

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: cubic · AI code reviewer
  • GitHub Check: Analyze (go)
🔇 Additional comments (1)
protocol/blockfetch/client_test.go (1)

91-93: Based on my comprehensive verification, I can now provide the rewritten review comment:


Goroutine lifecycle is properly managed; review comment based on incorrect line references.

The embedded scratchpad references incorrect line numbers (51-51, 108-109) that don't correspond to actual issues in the test. The code shows proper goroutine cleanup:

  • Line 49: defer goleak.VerifyNone(t) will catch any actual goroutine leaks
  • Lines 56-62 and 72-79: Test goroutines properly exit when channels close
  • Connection goroutines (connection.go lines 313-340): Use sync.WaitGroup with proper defer Done()
  • Shutdown sequence: Close() triggers doneChan, goroutines exit, shutdown() calls waitGroup.Wait()
  • Lines 93 and 107: Sleeps serve different purposes—mock shutdown (50ms) and graceful exit timing (100ms)—not masking cleanup failures

No evidence of actual goroutine leaks was provided. The code architecture handles cleanup correctly.

Jenita added 5 commits November 29, 2025 21:10
Signed-off-by: Jenita <jkawan@blinklabs.io>
Signed-off-by: Jenita <jkawan@blinklabs.io>
Signed-off-by: Jenita <jkawan@blinklabs.io>
Signed-off-by: Jenita <jkawan@blinklabs.io>
Signed-off-by: Jenita <jkawan@blinklabs.io>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (2)
connection.go (2)

277-321: Fix inverted EOF handling logic for server/client roles.

The EOF handling logic is backwards:

  • Line 286-287: When acting as server (c.server == true), the code unconditionally returns the error without checking if server-side protocols are done.
  • Lines 290-309: When acting as client (c.server == false), the code checks server-side protocols (.Server fields) instead of client-side protocols (.Client fields).

This breaks the PR objective: servers will always propagate EOF even when all protocols have cleanly shut down, and clients check the wrong protocols.

Additionally, io.ErrUnexpectedEOF should be handled alongside io.EOF, as the muxer may return it for partial reads.

Apply this fix (based on past review feedback):

 func (c *Connection) handleConnectionError(err error) error {
   if err == nil {
     return nil
   }
-
-  // Only propagate EOF errors when acting as a client with active server-side protocols
-  if errors.Is(err, io.EOF) {
-    // Check if we have any active server-side protocols
-    if c.server {
-      return err
-    }
-
-    // For clients, only propagate EOF if we have active server protocols
-    hasActiveServerProtocols := false
-    if c.chainSync != nil && c.chainSync.Server != nil && !c.chainSync.Server.IsDone() {
-      hasActiveServerProtocols = true
-    }
-    if c.blockFetch != nil && c.blockFetch.Server != nil && !c.blockFetch.Server.IsDone() {
-      hasActiveServerProtocols = true
-    }
-    if c.txSubmission != nil && c.txSubmission.Server != nil && !c.txSubmission.Server.IsDone() {
-      hasActiveServerProtocols = true
-    }
-    if c.localStateQuery != nil && c.localStateQuery.Server != nil && !c.localStateQuery.Server.IsDone() {
-      hasActiveServerProtocols = true
-    }
-    if c.localTxMonitor != nil && c.localTxMonitor.Server != nil && !c.localTxMonitor.Server.IsDone() {
-      hasActiveServerProtocols = true
-    }
-    if c.localTxSubmission != nil && c.localTxSubmission.Server != nil && !c.localTxSubmission.Server.IsDone() {
-      hasActiveServerProtocols = true
-    }
-
-    if hasActiveServerProtocols {
-      return err
-    }
-
-    // EOF with no active server protocols is normal connection closure
-    return nil
-  }
-
+  // Treat EOF/UnexpectedEOF as connection closed, decide based on active protocols for our role
+  if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
+    hasActive := false
+    if c.server {
+      // Server: check server-side protocols
+      if c.chainSync != nil && c.chainSync.Server != nil && !c.chainSync.Server.IsDone() {
+        hasActive = true
+      }
+      if c.blockFetch != nil && c.blockFetch.Server != nil && !c.blockFetch.Server.IsDone() {
+        hasActive = true
+      }
+      if c.txSubmission != nil && c.txSubmission.Server != nil && !c.txSubmission.Server.IsDone() {
+        hasActive = true
+      }
+      if c.localStateQuery != nil && c.localStateQuery.Server != nil && !c.localStateQuery.Server.IsDone() {
+        hasActive = true
+      }
+      if c.localTxMonitor != nil && c.localTxMonitor.Server != nil && !c.localTxMonitor.Server.IsDone() {
+        hasActive = true
+      }
+      if c.localTxSubmission != nil && c.localTxSubmission.Server != nil && !c.localTxSubmission.Server.IsDone() {
+        hasActive = true
+      }
+    } else {
+      // Client: check client-side protocols
+      if c.chainSync != nil && c.chainSync.Client != nil && !c.chainSync.Client.IsDone() {
+        hasActive = true
+      }
+      if c.blockFetch != nil && c.blockFetch.Client != nil && !c.blockFetch.Client.IsDone() {
+        hasActive = true
+      }
+      if c.txSubmission != nil && c.txSubmission.Client != nil && !c.txSubmission.Client.IsDone() {
+        hasActive = true
+      }
+      if c.localStateQuery != nil && c.localStateQuery.Client != nil && !c.localStateQuery.Client.IsDone() {
+        hasActive = true
+      }
+      if c.localTxMonitor != nil && c.localTxMonitor.Client != nil && !c.localTxMonitor.Client.IsDone() {
+        hasActive = true
+      }
+      if c.localTxSubmission != nil && c.localTxSubmission.Client != nil && !c.localTxSubmission.Client.IsDone() {
+        hasActive = true
+      }
+    }
+    // Propagate only if a protocol is still active; otherwise swallow as normal closure
+    if hasActive {
+      return err
+    }
+    return nil
+  }
   // For non-EOF errors, always propagate
   return err
 }

362-384: Resource leak: Close() not called when EOF is suppressed.

When handleConnectionError returns nil (suppressing a graceful EOF), c.Close() on Line 383 is never called. This causes:

  • The connection never shuts down
  • doneChan remains open indefinitely
  • Goroutines and resources leak

The muxer reporting an error (even a graceful EOF) means the connection is no longer usable and must be closed.

Apply this fix:

         case err, ok := <-c.muxer.ErrorChan():
           // Break out of goroutine if muxer's error channel is closed
           if !ok {
             return
           }

           // Use centralized connection error handling
-          if handledErr := c.handleConnectionError(err); handledErr != nil {
+          handledErr := c.handleConnectionError(err)
+          // Always close connection when muxer reports termination
+          defer c.Close()
+          if handledErr != nil {
             var connErr *muxer.ConnectionClosedError
             if errors.As(handledErr, &connErr) {
               // Pass through ConnectionClosedError from muxer (non-blocking)
               select {
               case c.errorChan <- handledErr:
               case <-c.doneChan:
                 // Connection is shutting down, don't block
                 return
               }
             } else {
               // Wrap error message to denote it comes from the muxer (non-blocking)
               select {
               case c.errorChan <- fmt.Errorf("muxer error: %w", handledErr):
               case <-c.doneChan:
                 // Connection is shutting down, don't block
                 return
               }
             }
-            // Close connection on muxer errors
-            c.Close()
           }
         }
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 42f7142 and fdaceaf.

📒 Files selected for processing (1)
  • connection.go (6 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
connection.go (3)
muxer/muxer.go (2)
  • ConnectionClosedError (72-75)
  • Muxer (57-70)
protocol/protocol.go (1)
  • ProtocolOptions (100-110)
connection/id.go (1)
  • ConnectionId (22-25)
🪛 GitHub Actions: go-test
connection.go

[error] 82-82: panic: send on closed channel

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: cubic · AI code reviewer
  • GitHub Check: Analyze (go)
🔇 Additional comments (2)
connection.go (2)

263-267: Good addition for orderly cleanup.

Draining the muxer's error channel ensures cleanup completes before shutdown finishes, preventing potential goroutine leaks.


334-334: Context lifecycle integration looks correct.

The context is properly created, cancelled on shutdown, and propagated to protocol options, aligning with the connection's lifecycle.

Also applies to: 339-339, 392-392

connection.go Outdated
if errors.As(handledErr, &connErr) {
// Pass through ConnectionClosedError from muxer (non-blocking)
select {
case c.errorChan <- handledErr:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Find connection.go file
fd -t f "connection.go"

Repository: blinklabs-io/gouroboros

Length of output: 80


🏁 Script executed:

#!/bin/bash
# Get file info
wc -l connection.go

# Read the ctx field declaration area (around line 82)
echo "=== Lines 75-90 (ctx field area) ==="
sed -n '75,90p' connection.go

# Read the errorChan close area (around line 274)
echo -e "\n=== Lines 270-280 (shutdown close area) ==="
sed -n '270,280p' connection.go

# Read the send areas (lines 360-380)
echo -e "\n=== Lines 360-380 (send operations) ==="
sed -n '360,380p' connection.go

Repository: blinklabs-io/gouroboros

Length of output: 1811


🏁 Script executed:

#!/bin/bash
# Find doneChan declaration and close
echo "=== Search for doneChan ==="
rg -n 'doneChan' connection.go -B2 -A2

# Find all goroutine launches
echo -e "\n=== Search for goroutine launches (go func) ==="
rg -n 'go func\|go c\.' connection.go -A5

# Look at the full shutdown function
echo -e "\n=== Full shutdown/close area ==="
sed -n '265,285p' connection.go

Repository: blinklabs-io/gouroboros

Length of output: 2381


🏁 Script executed:

#!/bin/bash
# Read the full error handling goroutine
echo "=== Complete error handling goroutine (lines 350-390) ==="
sed -n '350,390p' connection.go

# Also check if there are other goroutines that might interact with errorChan
echo -e "\n=== Search for all defer statements in error handling ==="
rg -n 'defer' connection.go -B3 -A1

# Check the full shutdown sequence
echo -e "\n=== Full shutdown function ==="
sed -n '265,280p' connection.go

Repository: blinklabs-io/gouroboros

Length of output: 2092


🏁 Script executed:

#!/bin/bash
# Check the shutdown function more completely
echo "=== Shutdown function (lines 260-280) ==="
sed -n '260,280p' connection.go

# Check if there are other goroutines that add to waitGroup
echo -e "\n=== All waitGroup.Add calls ==="
rg -n 'waitGroup.Add' connection.go -B2 -A5

# Look for context of the muxer error channel
echo -e "\n=== Check muxer error interaction ==="
sed -n '355,365p' connection.go

Repository: blinklabs-io/gouroboros

Length of output: 1681


🏁 Script executed:

#!/bin/bash
# Look for test files or CI configuration
echo "=== Searching for test files or CI config ==="
fd -t f '\.yml\|\.yaml\|_test\.go' | head -20

# Check if there's any GitHub workflow
echo -e "\n=== GitHub workflows ==="
fd -t f "workflow" .github/

# Look for recent commits or PR info
echo -e "\n=== Check git log for context ==="
git log --oneline -20 2>/dev/null || echo "Git not available"

Repository: blinklabs-io/gouroboros

Length of output: 212


🏁 Script executed:

#!/bin/bash
# Let's trace through the logic more carefully
# Check what happens after an error is received from muxer

echo "=== Lines 356-390: Error handling path after muxer error ==="
sed -n '356,390p' connection.go

# Check if there are any other sends to errorChan that might not have doneChan guards
echo -e "\n=== All sends to errorChan in the file ==="
rg -n 'errorChan\s*<-' connection.go -B3 -A3

# Check initialization of errorChan
echo -e "\n=== errorChan initialization ==="
rg -n 'errorChan.*make\|errorChan.*chan' connection.go -B1 -A1

Repository: blinklabs-io/gouroboros

Length of output: 1956


🏁 Script executed:

#!/bin/bash
# Read the full mini-protocol error handling goroutine
echo "=== Mini-protocol error handling goroutine (lines 456-475) ==="
sed -n '456,475p' connection.go

# Verify there are no other unguarded sends to errorChan
echo -e "\n=== Verify all errorChan operations ==="
rg -n 'c\.errorChan' connection.go

Repository: blinklabs-io/gouroboros

Length of output: 993


🏁 Script executed:

#!/bin/bash
# Check if there's a pattern we should follow from the guarded sends
echo "=== Pattern from protected sends (lines 367-371) ==="
sed -n '367,371p' connection.go

# Verify the line numbers are correct for the actual panic location
echo -e "\n=== Confirm line 469 location and context ==="
sed -n '465,472p' connection.go

# Check if there's any protection mechanism we might have missed
echo -e "\n=== Check if shutdown properly waits for goroutines ==="
sed -n '265,275p' connection.go

Repository: blinklabs-io/gouroboros

Length of output: 880


Unguarded send to errorChan at line 469 causes the race condition, not lines 368 or 376.

Line 469 directly sends to errorChan without protection, unlike lines 368 and 376 which properly guard sends with a nested select checking doneChan. When a mini-protocol error arrives just as Close() is called:

  1. The goroutine passes the outer select at line 461
  2. Reaches line 469 and attempts to send
  3. Meanwhile, shutdown() closes errorChan
  4. Panic: send on closed channel

Fix: Wrap line 469 with the same pattern as lines 368/376:

select {
case c.errorChan <- fmt.Errorf("protocol error: %w", err):
case <-c.doneChan:
    return
}
🤖 Prompt for AI Agents
In connection.go around lines 368–476 (actual problematic send at ~line 469),
the code sends to c.errorChan without guarding against c.doneChan being closed,
causing a race and possible "send on closed channel" panic when shutdown()
closes errorChan; change the unguarded send at line 469 to use the same guarded
pattern as at lines 368 and 376 by wrapping the send in a select that attempts
to send the formatted protocol error to c.errorChan and has a case <-c.doneChan
that returns if done; ensure formatting of the error matches the current
handledErr usage.

Jenita added 2 commits November 29, 2025 22:18
Signed-off-by: Jenita <jkawan@blinklabs.io>
Signed-off-by: Jenita <jkawan@blinklabs.io>
errorChan chan<- error
msg Message
errorChan chan<- error
stateRespChan chan<- State
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem to be used

p.currentStateMu.RLock()
defer p.currentStateMu.RUnlock()
return p.currentState
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is already a getCurrentState() function that does this


// IsDone checks if the protocol is in a done/completed state
func (p *Protocol) IsDone() bool {
currentState := p.CurrentState()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use p.getCurrentState() instead

connection.go Outdated
Muxer: c.muxer,
Logger: c.logger,
ErrorChan: c.protoErrorChan,
Context: c.ctx,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem to be used anywhere

Jenita added 2 commits December 1, 2025 18:47
Signed-off-by: Jenita <jkawan@blinklabs.io>
Signed-off-by: Jenita <jkawan@blinklabs.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Don't return an error on connection close when all protocols are shut down

4 participants