From 94428730f357d38521f11b5f5b6935d1952bea09 Mon Sep 17 00:00:00 2001 From: alienx5499 Date: Fri, 10 Oct 2025 00:52:44 +0530 Subject: [PATCH] Fix yamux stream destructor (Closes #240) --- include/libp2p/muxer/yamux/yamux_stream.hpp | 2 +- .../libp2p/muxer/yamux/yamuxed_connection.hpp | 11 +- src/muxer/yamux/yamux_stream.cpp | 17 ++- src/muxer/yamux/yamuxed_connection.cpp | 115 +++++++++++------- 4 files changed, 96 insertions(+), 49 deletions(-) diff --git a/include/libp2p/muxer/yamux/yamux_stream.hpp b/include/libp2p/muxer/yamux/yamux_stream.hpp index 9815a2a7..23b9f3a9 100644 --- a/include/libp2p/muxer/yamux/yamux_stream.hpp +++ b/include/libp2p/muxer/yamux/yamux_stream.hpp @@ -46,7 +46,7 @@ namespace libp2p::connection { YamuxStream &operator=(const YamuxStream &other) = delete; YamuxStream(YamuxStream &&other) = delete; YamuxStream &operator=(YamuxStream &&other) = delete; - ~YamuxStream() override = default; + ~YamuxStream() override; YamuxStream(std::shared_ptr connection, YamuxStreamFeedback &feedback, diff --git a/include/libp2p/muxer/yamux/yamuxed_connection.hpp b/include/libp2p/muxer/yamux/yamuxed_connection.hpp index 86afe206..dfe7ad93 100644 --- a/include/libp2p/muxer/yamux/yamuxed_connection.hpp +++ b/include/libp2p/muxer/yamux/yamuxed_connection.hpp @@ -79,7 +79,11 @@ namespace libp2p::connection { void deferWriteCallback(std::error_code ec, WriteCallbackFunc cb) override; private: - using Streams = std::unordered_map>; + using Streams = std::unordered_map>; + // Keep strong references for streams that were just created and not yet + // handed over to user handlers to prevent premature destruction. + using PendingStreams = + std::unordered_map>; using PendingOutboundStreams = std::unordered_map; @@ -176,7 +180,6 @@ namespace libp2p::connection { /// Expire timer callback void onExpireTimer(); - void setTimerCleanup(); void setTimerPing(); /// Copy of config @@ -207,6 +210,7 @@ namespace libp2p::connection { /// Active streams Streams streams_; + PendingStreams pending_streams_; /// Streams just created. Need to call handlers after all /// data is processed. StreamHandlerFunc is null for inbound streams @@ -224,9 +228,6 @@ namespace libp2p::connection { /// Timer handle for pings basic::Scheduler::Handle ping_handle_; - /// Cleanup for detached streams - basic::Scheduler::Handle cleanup_handle_; - /// Timer handle for auto closing if inactive basic::Scheduler::Handle inactivity_handle_; diff --git a/src/muxer/yamux/yamux_stream.cpp b/src/muxer/yamux/yamux_stream.cpp index 173e94e5..e28ced52 100644 --- a/src/muxer/yamux/yamux_stream.cpp +++ b/src/muxer/yamux/yamux_stream.cpp @@ -65,6 +65,22 @@ namespace libp2p::connection { assert(write_queue_limit >= maximum_window_size_); } + YamuxStream::~YamuxStream() { + // Clean up resources immediately when stream is destroyed + if (!isClosed()) { + // Stream is being destroyed while still active, need to clean up + SL_DEBUG(log(), "YamuxStream {} destroyed while active, cleaning up", stream_id_); + + // Close the stream properly + doClose(Error::STREAM_CLOSED_BY_HOST); + + // Note: We don't call feedback_.resetStream() here because the connection + // might have already removed this stream from its streams_ map, and calling + // resetStream could cause issues. The connection will handle cleanup + // when it detects the weak_ptr has expired. + } + } + void YamuxStream::readSome(BytesOut out, ReadCallbackFunc cb) { doRead(out, std::move(cb)); } @@ -307,7 +323,6 @@ namespace libp2p::connection { void YamuxStream::doClose(std::error_code ec) { // ensure lifetime of this object during doClose - auto self = shared_from_this(); std::optional finally_reading; if (auto reading = qtils::optionTake(reading_)) { diff --git a/src/muxer/yamux/yamuxed_connection.cpp b/src/muxer/yamux/yamuxed_connection.cpp index 96b311c0..007eb023 100644 --- a/src/muxer/yamux/yamuxed_connection.cpp +++ b/src/muxer/yamux/yamuxed_connection.cpp @@ -70,8 +70,6 @@ namespace libp2p::connection { } started_ = true; - setTimerCleanup(); - if (config_.ping_interval != std::chrono::milliseconds::zero()) { setTimerPing(); } @@ -235,7 +233,17 @@ namespace libp2p::connection { continue; } - auto stream = it->second; + auto stream = it->second.lock(); + if (!stream) { + // try pending strong ref (stream could be alive but weak expired) + if (auto pit = pending_streams_.find(id); pit != pending_streams_.end()) { + stream = pit->second; + } + } + if (!stream) { + log()->error("stream {} expired before handler call", id); + continue; + } if (!handler) { // inbound @@ -247,6 +255,9 @@ namespace libp2p::connection { handler(std::move(stream)); } + // drop pending strong ref after handler invocation + pending_streams_.erase(id); + if (!started_) { return; } @@ -318,12 +329,21 @@ namespace libp2p::connection { return; } + auto stream = it->second.lock(); + if (!stream) { + // stream was destroyed, clean up + SL_DEBUG(log(), "stream {} expired", stream_id); + streams_.erase(it); + reading_state_.discardDataMessage(); + return; + } + SL_TRACE(log(), "YamuxedConnection::processData, stream={}, size={}", stream_id, segment.size()); - auto result = it->second->onDataReceived(segment); + auto result = stream->onDataReceived(segment); if (result == YamuxStream::kKeepStream) { return; } @@ -467,7 +487,15 @@ namespace libp2p::connection { return; } - auto result = it->second->onFINReceived(); + auto stream = it->second.lock(); + if (!stream) { + // stream was destroyed, clean up + SL_DEBUG(log(), "stream {} expired", stream_id); + streams_.erase(it); + return; + } + + auto result = stream->onFINReceived(); if (result == YamuxStream::kRemoveStream) { eraseStream(stream_id); } @@ -495,7 +523,14 @@ namespace libp2p::connection { return; } - auto stream = std::move(it->second); + auto stream = it->second.lock(); + if (!stream) { + // stream was destroyed, clean up + SL_DEBUG(log(), "stream {} expired", stream_id); + streams_.erase(it); + return; + } + eraseStream(stream_id); stream->onRSTReceived(); } @@ -503,7 +538,14 @@ namespace libp2p::connection { bool YamuxedConnection::processWindowUpdate(const YamuxFrame &frame) { auto it = streams_.find(frame.stream_id); if (it != streams_.end()) { - it->second->increaseSendWindow(frame.length); + auto stream = it->second.lock(); + if (stream) { + stream->increaseSendWindow(frame.length); + } else { + // stream was destroyed, clean up + SL_DEBUG(log(), "stream {} expired during window update", frame.stream_id); + streams_.erase(it); + } } else { SL_DEBUG( log(), "processWindowUpdate: stream {} not found", frame.stream_id); @@ -531,8 +573,10 @@ namespace libp2p::connection { PendingOutboundStreams pending_streams; pending_streams.swap(pending_outbound_streams_); - for (auto [_, stream] : streams) { - stream->closedByConnection(notify_streams_code); + for (auto [_, stream_weak] : streams) { + if (auto stream = stream_weak.lock()) { + stream->closedByConnection(notify_streams_code); + } } for (auto [_, cb] : pending_streams) { @@ -589,7 +633,14 @@ namespace libp2p::connection { enqueue(closeStreamMsg(stream_id)); - auto &stream = it->second; + auto stream = it->second.lock(); + if (!stream) { + // stream was destroyed, clean up + SL_DEBUG(log(), "stream {} expired during streamClosed", stream_id); + streams_.erase(it); + return; + } + assert(stream->isClosedForWrite()); if (stream->isClosedForRead()) { @@ -652,8 +703,15 @@ namespace libp2p::connection { "onDataWritten : stream {} no longer exists", packet.stream_id); } else { - // stream can now call write callbacks - it->second->onDataWritten(sz); + auto stream = it->second.lock(); + if (stream) { + // stream can now call write callbacks + stream->onDataWritten(sz); + } else { + // stream was destroyed, clean up + SL_DEBUG(log(), "stream {} expired during write ack", packet.stream_id); + streams_.erase(it); + } } } } @@ -676,7 +734,9 @@ namespace libp2p::connection { stream_id, config_.maximum_window_size, basic::WriteQueue::kDefaultSizeLimit); - streams_[stream_id] = stream; + streams_[stream_id] = stream; // Store weak_ptr + // Hold strong ref until handler processes it to avoid premature dtor + pending_streams_[stream_id] = stream; inactivity_handle_.reset(); return stream; } @@ -717,35 +777,6 @@ namespace libp2p::connection { } } - // TODO(turuslan): #240, yamux stream destructor - void YamuxedConnection::setTimerCleanup() { - static constexpr auto kCleanupInterval = std::chrono::seconds(150); - cleanup_handle_ = scheduler_->scheduleWithHandle( - [weak_self{weak_from_this()}] { - auto self = weak_self.lock(); - if (not self) { - return; - } - if (not self->started_) { - return; - } - std::vector abandoned; - for (auto &[id, stream] : self->streams_) { - if (stream.use_count() == 1) { - abandoned.push_back(id); - self->enqueue(resetStreamMsg(id)); - } - } - if (!abandoned.empty()) { - log()->info("cleaning up {} abandoned streams", abandoned.size()); - for (const auto id : abandoned) { - self->streams_.erase(id); - } - } - self->setTimerCleanup(); - }, - kCleanupInterval); - } void YamuxedConnection::setTimerPing() { ping_handle_ = scheduler_->scheduleWithHandle(