Skip to content

Pipe: drop sink before processor and source on task drop#17916

Open
luoluoyuyu wants to merge 1 commit into
apache:masterfrom
luoluoyuyu:fix/pipe-drop-interrupt-supply-thread
Open

Pipe: drop sink before processor and source on task drop#17916
luoluoyuyu wants to merge 1 commit into
apache:masterfrom
luoluoyuyu:fix/pipe-drop-interrupt-supply-thread

Conversation

@luoluoyuyu

Copy link
Copy Markdown
Member

Summary

  • Change DataNode pipe task drop order to sink → processor → source
  • Stop the connector first so downstream forwarding stops promptly when a pipe is dropped

Test plan

  • DROP PIPE while historical pipe is actively forwarding data
  • Verify receiver stops receiving new data promptly after DROP PIPE
  • Verify SHOW PIPES reflects dropped pipe state correctly

Made with Cursor

Stop the connector first so downstream forwarding stops promptly when a pipe is dropped.
@Caideyipi

Copy link
Copy Markdown
Collaborator

Thanks for the fix. I think the new drop order has a side effect for IoTConsensusV2.

PipeTaskStage.drop() stops and then drops the stage, so with sinkStage.drop() first, PipeSinkSubtaskManager.deregister() now runs before the processor/source stages are dropped. That method deregisters PipeEventCommitManager and resets ReplicateProgressDataNodeManager, and its current comment explicitly assumes extractor and processor have already been dropped. With the new order, that assumption is no longer true.

There is still a race window where the processor execution thread may deliver events downstream after close, and the source can still assign IoTConsensusV2 replicate indexes while supply() is racing with drop. In that case the sink can reset the replicate index to 0 first, and then source/processor can assign or process another event afterwards, defeating the reset and potentially leaving stale/non-zero leader replicate progress after the pipe has been dropped.

Could we split the operation so that the sink is stopped first to stop forwarding promptly, then processor/source are dropped, and only then the sink does deregister/reset? Alternatively, move the IoTV2 replicate-index reset and committer deregistration to a point after processor/source are known to be dropped. A test covering DROP PIPE while a historical or realtime consensus pipe is actively forwarding would also help guard this ordering.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants