Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions src/ModelContextProtocol.Core/Client/ClientCompletionDetails.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
namespace ModelContextProtocol.Client;

/// <summary>
/// Provides details about why an MCP client session completed.
/// </summary>
/// <remarks>
/// <para>
/// Transport implementations may return derived types with additional strongly-typed
/// information, such as <see cref="StdioClientCompletionDetails"/>.
/// </para>
/// </remarks>
public class ClientCompletionDetails
{
/// <summary>
/// Gets the exception that caused the session to close, if any.
/// </summary>
/// <remarks>
/// This is <see langword="null"/> for graceful closure.
/// </remarks>
public Exception? Exception { get; set; }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
using System.Net;

namespace ModelContextProtocol.Client;

/// <summary>
/// Provides details about the completion of an HTTP-based MCP client session,
/// including sessions using the legacy SSE transport or the Streamable HTTP transport.
/// </summary>
public sealed class HttpClientCompletionDetails : ClientCompletionDetails
{
/// <summary>
/// Gets the HTTP status code that caused the session to close, or <see langword="null"/> if unavailable.
/// </summary>
public HttpStatusCode? HttpStatusCode { get; set; }
}
17 changes: 17 additions & 0 deletions src/ModelContextProtocol.Core/Client/McpClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,21 @@ protected McpClient()
/// </para>
/// </remarks>
public abstract string? ServerInstructions { get; }

/// <summary>
/// Gets a <see cref="Task{TResult}"/> that completes when the client session has completed.
/// </summary>
/// <remarks>
/// <para>
/// The task always completes successfully. The result provides details about why the session
/// completed. Transport implementations may return derived types with additional strongly-typed
/// information, such as <see cref="StdioClientCompletionDetails"/>.
/// </para>
/// <para>
/// For graceful closure (e.g., explicit disposal), <see cref="ClientCompletionDetails.Exception"/>
/// will be <see langword="null"/>. For unexpected closure (e.g., process crash, network failure),
/// it may contain an exception that caused or that represents the failure.
/// </para>
/// </remarks>
public abstract Task<ClientCompletionDetails> Completion { get; }
}
11 changes: 11 additions & 0 deletions src/ModelContextProtocol.Core/Client/McpClientImpl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,9 @@ private void RegisterTaskHandlers(RequestHandlers requestHandlers, IMcpTaskStore
/// <inheritdoc/>
public override string? ServerInstructions => _serverInstructions;

/// <inheritdoc/>
public override Task<ClientCompletionDetails> Completion => _sessionHandler.CompletionTask;

/// <summary>
/// Asynchronously connects to an MCP server, establishes the transport connection, and completes the initialization handshake.
/// </summary>
Expand Down Expand Up @@ -655,6 +658,14 @@ public override async ValueTask DisposeAsync()
_taskCancellationTokenProvider?.Dispose();
await _sessionHandler.DisposeAsync().ConfigureAwait(false);
await _transport.DisposeAsync().ConfigureAwait(false);

// After disposal, the channel writer is complete but ProcessMessagesCoreAsync
// may have been cancelled with unread items still buffered. ChannelReader.Completion
// only resolves once all items are consumed, so drain remaining items.
while (_transport.MessageReader.TryRead(out var _));

// Then ensure all work has quiesced.
await Completion.ConfigureAwait(false);
}

[LoggerMessage(Level = LogLevel.Information, Message = "{EndpointName} client received server '{ServerInfo}' capabilities: '{Capabilities}'.")]
Expand Down
17 changes: 15 additions & 2 deletions src/ModelContextProtocol.Core/Client/SseClientSessionTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using Microsoft.Extensions.Logging.Abstractions;
using ModelContextProtocol.Protocol;
using System.Diagnostics;
using System.Net;
using System.Net.Http.Headers;
using System.Net.ServerSentEvents;
using System.Text.Json;
Expand Down Expand Up @@ -124,7 +125,7 @@ private async Task CloseAsync()
}
finally
{
SetDisconnected();
SetDisconnected(new TransportClosedException(new HttpClientCompletionDetails()));
}
}

Expand All @@ -143,6 +144,7 @@ public override async ValueTask DisposeAsync()

private async Task ReceiveMessagesAsync(CancellationToken cancellationToken)
{
HttpStatusCode? failureStatusCode = null;
try
{
using var request = new HttpRequestMessage(HttpMethod.Get, _sseEndpoint);
Expand All @@ -151,6 +153,11 @@ private async Task ReceiveMessagesAsync(CancellationToken cancellationToken)

using var response = await _httpClient.SendAsync(request, message: null, cancellationToken).ConfigureAwait(false);

if (!response.IsSuccessStatusCode)
{
failureStatusCode = response.StatusCode;
}

await response.EnsureSuccessStatusCodeWithResponseBodyAsync(cancellationToken).ConfigureAwait(false);

using var stream = await response.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false);
Expand Down Expand Up @@ -179,14 +186,20 @@ private async Task ReceiveMessagesAsync(CancellationToken cancellationToken)
}
else
{
SetDisconnected(new TransportClosedException(new HttpClientCompletionDetails
{
HttpStatusCode = failureStatusCode,
Exception = ex,
}));

LogTransportReadMessagesFailed(Name, ex);
_connectionEstablished.TrySetException(ex);
throw;
}
}
finally
{
SetDisconnected();
SetDisconnected(new TransportClosedException(new HttpClientCompletionDetails()));
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
namespace ModelContextProtocol.Client;

/// <summary>
/// Provides details about the completion of a stdio-based MCP client session.
/// </summary>
public sealed class StdioClientCompletionDetails : ClientCompletionDetails
{
/// <summary>
/// Gets the process ID of the server process, or <see langword="null"/> if unavailable.
/// </summary>
public int? ProcessId { get; set; }

/// <summary>
/// Gets the exit code of the server process, or <see langword="null"/> if unavailable.
/// </summary>
public int? ExitCode { get; set; }

/// <summary>
/// Gets the last lines of the server process's standard error output, or <see langword="null"/> if unavailable.
/// </summary>
public IReadOnlyList<string>? StandardErrorTail { get; set; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,22 @@
namespace ModelContextProtocol.Client;

/// <summary>Provides the client side of a stdio-based session transport.</summary>
internal sealed class StdioClientSessionTransport(
StdioClientTransportOptions options, Process process, string endpointName, Queue<string> stderrRollingLog, ILoggerFactory? loggerFactory) :
StreamClientSessionTransport(process.StandardInput.BaseStream, process.StandardOutput.BaseStream, encoding: null, endpointName, loggerFactory)
internal sealed class StdioClientSessionTransport : StreamClientSessionTransport
{
private readonly StdioClientTransportOptions _options = options;
private readonly Process _process = process;
private readonly Queue<string> _stderrRollingLog = stderrRollingLog;
private readonly StdioClientTransportOptions _options;
private readonly Process _process;
private readonly Queue<string> _stderrRollingLog;
private int _cleanedUp = 0;
private readonly int? _processId;

public StdioClientSessionTransport(StdioClientTransportOptions options, Process process, string endpointName, Queue<string> stderrRollingLog, ILoggerFactory? loggerFactory) :
base(process.StandardInput.BaseStream, process.StandardOutput.BaseStream, encoding: null, endpointName, loggerFactory)
{
_options = options;
_process = process;
_stderrRollingLog = stderrRollingLog;
try { _processId = process.Id; } catch { }
}

/// <inheritdoc/>
public override async Task SendMessageAsync(JsonRpcMessage message, CancellationToken cancellationToken = default)
Expand Down Expand Up @@ -47,17 +55,26 @@ protected override async ValueTask CleanupAsync(Exception? error = null, Cancell
// so create an exception with details about that.
error ??= await GetUnexpectedExitExceptionAsync(cancellationToken).ConfigureAwait(false);

// Now terminate the server process.
// Terminate the server process (or confirm it already exited), then build
// and publish strongly-typed completion details while the process handle
// is still valid so we can read the exit code.
try
{
StdioClientTransport.DisposeProcess(_process, processRunning: true, shutdownTimeout: _options.ShutdownTimeout);
StdioClientTransport.DisposeProcess(
_process,
processRunning: true,
_options.ShutdownTimeout,
beforeDispose: () => SetDisconnected(new TransportClosedException(BuildCompletionDetails(error))));
}
catch (Exception ex)
{
LogTransportShutdownFailed(Name, ex);
SetDisconnected(new TransportClosedException(BuildCompletionDetails(error)));
}

// And handle cleanup in the base type.
// And handle cleanup in the base type. SetDisconnected has already been
// called above, so the base call is a no-op for disconnect state but
// still performs other cleanup (cancelling the read task, etc.).
await base.CleanupAsync(error, cancellationToken).ConfigureAwait(false);
}

Expand Down Expand Up @@ -104,4 +121,32 @@ protected override async ValueTask CleanupAsync(Exception? error = null, Cancell

return new IOException(errorMessage);
}

private StdioClientCompletionDetails BuildCompletionDetails(Exception? error)
{
StdioClientCompletionDetails details = new()
{
Exception = error,
ProcessId = _processId,
};

try
{
if (StdioClientTransport.HasExited(_process))
{
details.ExitCode = _process.ExitCode;
}
}
catch { }

lock (_stderrRollingLog)
{
if (_stderrRollingLog.Count > 0)
{
details.StandardErrorTail = _stderrRollingLog.ToArray();
}
}

return details;
}
}
6 changes: 5 additions & 1 deletion src/ModelContextProtocol.Core/Client/StdioClientTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ public async Task<ITransport> ConnectAsync(CancellationToken cancellationToken =
}

internal static void DisposeProcess(
Process? process, bool processRunning, TimeSpan shutdownTimeout)
Process? process, bool processRunning, TimeSpan shutdownTimeout, Action? beforeDispose = null)
{
if (process is not null)
{
Expand All @@ -239,6 +239,10 @@ internal static void DisposeProcess(
{
process.WaitForExit();
}

// Invoke the callback while the process handle is still valid,
// e.g. to read ExitCode before Dispose() invalidates it.
beforeDispose?.Invoke();
}
finally
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ internal sealed partial class StreamableHttpClientSessionTransport : TransportBa

private readonly McpHttpClient _httpClient;
private readonly HttpClientTransportOptions _options;
private readonly CancellationTokenSource _connectionCts;
private readonly CancellationTokenSource _connectionCts = new();
private readonly ILogger _logger;

private string? _negotiatedProtocolVersion;
private Task? _getReceiveTask;
private volatile TransportClosedException? _disconnectError;

private readonly SemaphoreSlim _disposeLock = new(1, 1);
private bool _disposed;
Expand All @@ -42,7 +43,6 @@ public StreamableHttpClientSessionTransport(

_options = transportOptions;
_httpClient = httpClient;
_connectionCts = new CancellationTokenSource();
_logger = (ILogger?)loggerFactory?.CreateLogger<HttpClientTransport>() ?? NullLogger.Instance;

// We connect with the initialization request with the MCP transport. This means that any errors won't be observed
Expand Down Expand Up @@ -96,6 +96,13 @@ internal async Task<HttpResponseMessage> SendHttpRequestAsync(JsonRpcMessage mes
// We'll let the caller decide whether to throw or fall back given an unsuccessful response.
if (!response.IsSuccessStatusCode)
{
// Per the MCP spec, a 404 response to a request containing an Mcp-Session-Id
// indicates the session has ended. Signal completion so McpClient.Completion resolves.
if (response.StatusCode == HttpStatusCode.NotFound && SessionId is not null)
{
SetSessionExpired(response.StatusCode);
}

return response;
}

Expand Down Expand Up @@ -184,18 +191,14 @@ public override async ValueTask DisposeAsync()
{
LogTransportShutdownFailed(Name, ex);
}
finally
{
_connectionCts.Dispose();
}
}
finally
{
// If we're auto-detecting the transport and failed to connect, leave the message Channel open for the SSE transport.
// This class isn't directly exposed to public callers, so we don't have to worry about changing the _state in this case.
if (_options.TransportMode is not HttpTransportMode.AutoDetect || _getReceiveTask is not null)
{
SetDisconnected();
SetDisconnected(_disconnectError ?? new TransportClosedException(new HttpClientCompletionDetails()));
}
}
}
Expand All @@ -204,8 +207,8 @@ private async Task ReceiveUnsolicitedMessagesAsync()
{
var state = new SseStreamState();

// Continuously receive unsolicited messages until canceled
while (!_connectionCts.Token.IsCancellationRequested)
// Continuously receive unsolicited messages until canceled or disconnected
while (!_connectionCts.Token.IsCancellationRequested && IsConnected)
{
await SendGetSseRequestWithRetriesAsync(
relatedRpcRequest: null,
Expand Down Expand Up @@ -285,6 +288,13 @@ await SendGetSseRequestWithRetriesAsync(

if (!response.IsSuccessStatusCode)
{
// Per the MCP spec, a 404 response to a request containing an Mcp-Session-Id
// indicates the session has ended. Signal completion so McpClient.Completion resolves.
if (response.StatusCode == HttpStatusCode.NotFound && SessionId is not null)
{
SetSessionExpired(response.StatusCode);
}

// If the server could be reached but returned a non-success status code,
// retrying likely won't change that.
return null;
Expand Down Expand Up @@ -474,4 +484,23 @@ private static TimeSpan ElapsedSince(long stopwatchTimestamp)
return TimeSpan.FromSeconds((double)(Stopwatch.GetTimestamp() - stopwatchTimestamp) / Stopwatch.Frequency);
#endif
}

private void SetSessionExpired(HttpStatusCode statusCode)
{
// Store the error before canceling so DisposeAsync can use it if it races us, especially
// after the call to Cancel below, to invoke SetDisconnected.
_disconnectError = new TransportClosedException(new HttpClientCompletionDetails
{
HttpStatusCode = statusCode,
Exception = new McpException(
"The server returned HTTP 404 for a request with an Mcp-Session-Id, indicating the session has expired. " +
"To continue, create a new client session or call ResumeSessionAsync with a new connection."),
});

// Cancel to unblock any in-flight operations (e.g., SSE stream reads in
// SendGetSseRequestWithRetriesAsync) that are waiting on _connectionCts.Token.
_connectionCts.Cancel();

SetDisconnected(_disconnectError);
}
}
Loading
Loading