Skip to content

Conversation

@DaleSeo
Copy link
Contributor

@DaleSeo DaleSeo commented Dec 28, 2025

Fixes #529

Motivation and Context

This PR implements modelcontextprotocol/modelcontextprotocol#1699 (Server-Initiated SSE Stream Disconnection), enabling MCP servers to disconnect SSE streams at will while allowing clients to reconnect via Last-Event-ID.

This implementation follows modelcontextprotocol/typescript-sdk#1061 as a reference.

ServerSseMessage now includes a retry: Option<Duration> field following the SSE specification's retry: field for client reconnection timing. The message field changed from Arc<ServerJsonRpcMessage> to Option<Arc<ServerJsonRpcMessage>> to support priming events, which carry an event ID and retry interval but no message payload.

LocalSessionHandle gains two methods for server-initiated disconnection: close_sse_stream() closes request-specific (POST) streams and close_standalone_sse_stream() closes standalone (GET) streams. Both methods optionally send a priming event before closing to inform clients of the recommended reconnection delay.

StreamableHttpServerConfig now automatically enables priming events when stateful_mode is true (the default). The default retry interval is 3 seconds, matching the TypeScript SDK behavior.

use std::time::Duration;

// Priming is automatic with stateful_mode (default: true) and 3-second retry interval
let config = StreamableHttpServerConfig::default();

// Override the retry interval
let config = StreamableHttpServerConfig {
    sse_retry: Some(Duration::from_secs(5)),
    ..Default::default()
};

// Disable priming
let config = StreamableHttpServerConfig {
    sse_retry: None,
    ..Default::default()
};

// Close a POST stream
session.close_sse_stream(http_request_id, Some(Duration::from_secs(3))).await?;

// Close a standalone GET stream
session.close_standalone_sse_stream(Some(Duration::from_secs(3))).await?;

How Has This Been Tested?

Added new integration tests to verify the priming behavior.

Breaking Changes

The message field in ServerSseMessage changed from Arc<ServerJsonRpcMessage> to Option<Arc<ServerJsonRpcMessage>> to support priming events, which have no message payload. Existing code constructing ServerSseMessage directly will need to wrap the message in Some().

// Wrap message in Some() and add retry field
let msg = ServerSseMessage {
    event_id: Some("1".to_string()),
    message: Some(Arc::new(json_rpc_message)),
    retry: None,
};

I considered several approaches to avoid the breaking change to ServerSseMessage. One option was to add an is_priming: bool flag while keeping message as Arc<ServerJsonRpcMessage>:

pub struct ServerSseMessage {
    pub event_id: Option<String>,
    pub message: Arc<ServerJsonRpcMessage>,  // unchanged
    pub retry: Option<Duration>,
    pub is_priming: bool,  // new flag
}

This would check the flag in sse_stream_response() to emit empty data for priming events. However, priming events would still carry a meaningless message field, which is semantically incorrect.

Another option was to convert ServerSseMessage from a struct to an enum:

pub enum ServerSseMessage {
    Message {
        event_id: Option<String>,
        message: Arc<ServerJsonRpcMessage>,
        retry: Option<Duration>,
    },
    Priming {
        event_id: String,
        retry: Duration,
    },
}

This would be type-safe but should be a bigger breaking change (struct to enum), and arguably more disruptive than the current approach.

I chose Option<Arc<...>> because it accurately represents that priming events have no message, and ServerSseMessage is primarily used internally rather than by external users.

pub struct ServerSseMessage {
    pub event_id: Option<String>,
    pub message: Option<Arc<ServerJsonRpcMessage>>,  // changed from Arc<...> to Option<Arc<...>>
    pub retry: Option<Duration>,
}

I'm open to feedback on whether a different approach would be preferred.

Types of changes

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to change)
  • Documentation update

Checklist

  • I have read the MCP Documentation
  • My code follows the repository's style guidelines
  • New and existing tests pass locally
  • I have added appropriate error handling
  • I have added or updated documentation as needed

Additional context

Client-side reconnection logic was already implemented in client_side_sse.rs.

@github-actions github-actions bot added T-dependencies Dependencies related changes T-test Testing related changes T-config Configuration file changes T-core Core library changes T-transport Transport layer changes labels Dec 28, 2025
@DaleSeo DaleSeo changed the title Sep 1699 Implement SEP-1699: Support SSE Polling via Server-Side Disconnect Dec 28, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

T-config Configuration file changes T-core Core library changes T-dependencies Dependencies related changes T-test Testing related changes T-transport Transport layer changes

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Implement SEP-1699: Support SSE Polling via Server-Side Disconnect

1 participant