Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion include/libp2p/muxer/yamux/yamux_stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ namespace libp2p::connection {
YamuxStream &operator=(const YamuxStream &other) = delete;
YamuxStream(YamuxStream &&other) = delete;
YamuxStream &operator=(YamuxStream &&other) = delete;
~YamuxStream() override = default;
~YamuxStream() override;

YamuxStream(std::shared_ptr<connection::SecureConnection> connection,
YamuxStreamFeedback &feedback,
Expand Down
11 changes: 6 additions & 5 deletions include/libp2p/muxer/yamux/yamuxed_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,11 @@ namespace libp2p::connection {
void deferWriteCallback(std::error_code ec, WriteCallbackFunc cb) override;

private:
using Streams = std::unordered_map<StreamId, std::shared_ptr<YamuxStream>>;
using Streams = std::unordered_map<StreamId, std::weak_ptr<YamuxStream>>;
// Keep strong references for streams that were just created and not yet
// handed over to user handlers to prevent premature destruction.
using PendingStreams =
std::unordered_map<StreamId, std::shared_ptr<YamuxStream>>;

using PendingOutboundStreams =
std::unordered_map<StreamId, StreamHandlerFunc>;
Expand Down Expand Up @@ -176,7 +180,6 @@ namespace libp2p::connection {
/// Expire timer callback
void onExpireTimer();

void setTimerCleanup();
void setTimerPing();

/// Copy of config
Expand Down Expand Up @@ -207,6 +210,7 @@ namespace libp2p::connection {

/// Active streams
Streams streams_;
PendingStreams pending_streams_;

/// Streams just created. Need to call handlers after all
/// data is processed. StreamHandlerFunc is null for inbound streams
Expand All @@ -224,9 +228,6 @@ namespace libp2p::connection {
/// Timer handle for pings
basic::Scheduler::Handle ping_handle_;

/// Cleanup for detached streams
basic::Scheduler::Handle cleanup_handle_;

/// Timer handle for auto closing if inactive
basic::Scheduler::Handle inactivity_handle_;

Expand Down
17 changes: 16 additions & 1 deletion src/muxer/yamux/yamux_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,22 @@ namespace libp2p::connection {
assert(write_queue_limit >= maximum_window_size_);
}

YamuxStream::~YamuxStream() {
// Clean up resources immediately when stream is destroyed
if (!isClosed()) {
// Stream is being destroyed while still active, need to clean up
SL_DEBUG(log(), "YamuxStream {} destroyed while active, cleaning up", stream_id_);

// Close the stream properly
doClose(Error::STREAM_CLOSED_BY_HOST);

// Note: We don't call feedback_.resetStream() here because the connection
// might have already removed this stream from its streams_ map, and calling
// resetStream could cause issues. The connection will handle cleanup
// when it detects the weak_ptr has expired.
}
}

void YamuxStream::readSome(BytesOut out, ReadCallbackFunc cb) {
doRead(out, std::move(cb));
}
Expand Down Expand Up @@ -307,7 +323,6 @@ namespace libp2p::connection {

void YamuxStream::doClose(std::error_code ec) {
// ensure lifetime of this object during doClose
auto self = shared_from_this();

std::optional<FinallyReading> finally_reading;
if (auto reading = qtils::optionTake(reading_)) {
Expand Down
115 changes: 73 additions & 42 deletions src/muxer/yamux/yamuxed_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,6 @@ namespace libp2p::connection {
}
started_ = true;

setTimerCleanup();

if (config_.ping_interval != std::chrono::milliseconds::zero()) {
setTimerPing();
}
Expand Down Expand Up @@ -235,7 +233,17 @@ namespace libp2p::connection {
continue;
}

auto stream = it->second;
auto stream = it->second.lock();
if (!stream) {
// try pending strong ref (stream could be alive but weak expired)
if (auto pit = pending_streams_.find(id); pit != pending_streams_.end()) {
stream = pit->second;
}
}
if (!stream) {
log()->error("stream {} expired before handler call", id);
continue;
}

if (!handler) {
// inbound
Expand All @@ -247,6 +255,9 @@ namespace libp2p::connection {
handler(std::move(stream));
}

// drop pending strong ref after handler invocation
pending_streams_.erase(id);

if (!started_) {
return;
}
Expand Down Expand Up @@ -318,12 +329,21 @@ namespace libp2p::connection {
return;
}

auto stream = it->second.lock();
if (!stream) {
// stream was destroyed, clean up
SL_DEBUG(log(), "stream {} expired", stream_id);
streams_.erase(it);
reading_state_.discardDataMessage();
return;
}

SL_TRACE(log(),
"YamuxedConnection::processData, stream={}, size={}",
stream_id,
segment.size());

auto result = it->second->onDataReceived(segment);
auto result = stream->onDataReceived(segment);
if (result == YamuxStream::kKeepStream) {
return;
}
Expand Down Expand Up @@ -467,7 +487,15 @@ namespace libp2p::connection {
return;
}

auto result = it->second->onFINReceived();
auto stream = it->second.lock();
if (!stream) {
// stream was destroyed, clean up
SL_DEBUG(log(), "stream {} expired", stream_id);
streams_.erase(it);
return;
}

auto result = stream->onFINReceived();
if (result == YamuxStream::kRemoveStream) {
eraseStream(stream_id);
}
Expand Down Expand Up @@ -495,15 +523,29 @@ namespace libp2p::connection {
return;
}

auto stream = std::move(it->second);
auto stream = it->second.lock();
if (!stream) {
// stream was destroyed, clean up
SL_DEBUG(log(), "stream {} expired", stream_id);
streams_.erase(it);
return;
}

eraseStream(stream_id);
stream->onRSTReceived();
}

bool YamuxedConnection::processWindowUpdate(const YamuxFrame &frame) {
auto it = streams_.find(frame.stream_id);
if (it != streams_.end()) {
it->second->increaseSendWindow(frame.length);
auto stream = it->second.lock();
if (stream) {
stream->increaseSendWindow(frame.length);
} else {
// stream was destroyed, clean up
SL_DEBUG(log(), "stream {} expired during window update", frame.stream_id);
streams_.erase(it);
}
} else {
SL_DEBUG(
log(), "processWindowUpdate: stream {} not found", frame.stream_id);
Expand Down Expand Up @@ -531,8 +573,10 @@ namespace libp2p::connection {
PendingOutboundStreams pending_streams;
pending_streams.swap(pending_outbound_streams_);

for (auto [_, stream] : streams) {
stream->closedByConnection(notify_streams_code);
for (auto [_, stream_weak] : streams) {
if (auto stream = stream_weak.lock()) {
stream->closedByConnection(notify_streams_code);
}
}

for (auto [_, cb] : pending_streams) {
Expand Down Expand Up @@ -589,7 +633,14 @@ namespace libp2p::connection {

enqueue(closeStreamMsg(stream_id));

auto &stream = it->second;
auto stream = it->second.lock();
if (!stream) {
// stream was destroyed, clean up
SL_DEBUG(log(), "stream {} expired during streamClosed", stream_id);
streams_.erase(it);
return;
}

assert(stream->isClosedForWrite());

if (stream->isClosedForRead()) {
Expand Down Expand Up @@ -652,8 +703,15 @@ namespace libp2p::connection {
"onDataWritten : stream {} no longer exists",
packet.stream_id);
} else {
// stream can now call write callbacks
it->second->onDataWritten(sz);
auto stream = it->second.lock();
if (stream) {
// stream can now call write callbacks
stream->onDataWritten(sz);
} else {
// stream was destroyed, clean up
SL_DEBUG(log(), "stream {} expired during write ack", packet.stream_id);
streams_.erase(it);
}
}
}
}
Expand All @@ -676,7 +734,9 @@ namespace libp2p::connection {
stream_id,
config_.maximum_window_size,
basic::WriteQueue::kDefaultSizeLimit);
streams_[stream_id] = stream;
streams_[stream_id] = stream; // Store weak_ptr
// Hold strong ref until handler processes it to avoid premature dtor
pending_streams_[stream_id] = stream;
inactivity_handle_.reset();
return stream;
}
Expand Down Expand Up @@ -717,35 +777,6 @@ namespace libp2p::connection {
}
}

// TODO(turuslan): #240, yamux stream destructor
void YamuxedConnection::setTimerCleanup() {
static constexpr auto kCleanupInterval = std::chrono::seconds(150);
cleanup_handle_ = scheduler_->scheduleWithHandle(
[weak_self{weak_from_this()}] {
auto self = weak_self.lock();
if (not self) {
return;
}
if (not self->started_) {
return;
}
std::vector<StreamId> abandoned;
for (auto &[id, stream] : self->streams_) {
if (stream.use_count() == 1) {
abandoned.push_back(id);
self->enqueue(resetStreamMsg(id));
}
}
if (!abandoned.empty()) {
log()->info("cleaning up {} abandoned streams", abandoned.size());
for (const auto id : abandoned) {
self->streams_.erase(id);
}
}
self->setTimerCleanup();
},
kCleanupInterval);
}

void YamuxedConnection::setTimerPing() {
ping_handle_ = scheduler_->scheduleWithHandle(
Expand Down
Loading