Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public async Task<IObservable<TResult>> StartAsync()
return sseClient.Subscription.Select(e => ConvertResult(queryExecutor.ProcessResponse(e, QueryNode.Name, payload.Query)));
}

var wsClient = new WSClient(client.SubscriptionUrl, client.SubscriptionProtocol, payload);
var wsClient = new WSClient(client, payload);
await wsClient.Start();
return wsClient.Subscription.Select(e => ConvertResult(queryExecutor.ProcessResponse(e, QueryNode.Name, payload.Query)));
}
Expand Down
3 changes: 2 additions & 1 deletion src/Linq2GraphQL.Client.Subscriptions/SSEClient.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Net.Http.Headers;
using System.Net.Mime;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Text;
Expand Down Expand Up @@ -34,7 +35,7 @@ public async Task Start()

var request = new HttpRequestMessage(HttpMethod.Post, "")
{
Content = new StringContent(json, Encoding.UTF8, "application/json")
Content = new StringContent(json, Encoding.UTF8, MediaTypeNames.Application.Json)
};

request.Headers.Accept.Add(new MediaTypeWithQualityHeaderValue("text/event-stream"));
Expand Down
8 changes: 8 additions & 0 deletions src/Linq2GraphQL.Client.Subscriptions/SubscribeCommands.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace Linq2GraphQL.Client.Subscriptions
{
internal class SubscribeCommands
{
internal const string SUBSCRIBE = "subscribe";
internal const string START = "start";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace Linq2GraphQL.Client.Subscriptions
{
internal static class SubscriptionProtocols
{
internal const string GRAPGQL_TRANSPORT_WS = "graphql-transport-ws";
internal const string GRAPHQL_WS = "graphql-ws";
}
}
48 changes: 29 additions & 19 deletions src/Linq2GraphQL.Client.Subscriptions/WSClient.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Net.WebSockets;
using System.Diagnostics;
using System.Net.WebSockets;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Text.Json;
Expand All @@ -9,19 +10,17 @@ namespace Linq2GraphQL.Client.Subscriptions;

public class WSClient : IAsyncDisposable
{
private readonly GraphClient _graphClient;
private readonly GraphQLRequest payload;
private readonly SubscriptionProtocol subscriptionProtocol;

private readonly Subject<string> subscriptionSubject = new();
private readonly string url;
private readonly WebsocketClient client;

private readonly JsonSerializerOptions jsonOptions;

public WSClient(string url, SubscriptionProtocol subprotocol, GraphQLRequest payload)
public WSClient(GraphClient graphClient, GraphQLRequest payload)
{
this.url = url;
subscriptionProtocol = subprotocol;
_graphClient = graphClient;
this.payload = payload;
jsonOptions = new JsonSerializerOptions
{
Expand All @@ -35,7 +34,7 @@ public WSClient(string url, SubscriptionProtocol subprotocol, GraphQLRequest pay
return ws;
});

client = new WebsocketClient(new Uri(url), factory)
client = new WebsocketClient(new Uri(_graphClient.SubscriptionUrl), factory)
{
ReconnectTimeout = TimeSpan.FromSeconds(30)
};
Expand Down Expand Up @@ -66,15 +65,25 @@ public async Task Start()
//Filter General response
var tt = client.MessageReceived.Select(m => JsonSerializer.Deserialize<WebsocketResponse>(m.ToString()));

tt.Where(e => e.Type == "ping").Subscribe(msg => SendRequest(new WebsocketRequest("pong")));
tt.Where(e => e.Type == WebsocketRequestTypes.PING).Subscribe(msg => SendRequest(new WebsocketRequest(WebsocketRequestTypes.PONG)));

tt.Where(e => !string.IsNullOrEmpty(e?.Id)).Subscribe(r =>
{
subscriptionSubject.OnNext(r.Payload?.ToString());
});

await client.Start();
SendRequest(new WebsocketRequest("connection_init"));

var initRequest = new WebsocketRequest(WebsocketRequestTypes.CONNECTION_INIT);
if (_graphClient.WSConnectionInitPayload is not null)
{
var initPayload = await _graphClient.WSConnectionInitPayload(_graphClient);
if (initPayload is not null)
{
initRequest.Payload = initPayload;
}
}
SendRequest(initRequest);

var subscriptionRequest = new WebsocketRequest(GetSubscribeCommand())
{
Expand All @@ -87,37 +96,38 @@ public async Task Start()

private string GetSubprotocolString()
{
switch (subscriptionProtocol)
switch (_graphClient.SubscriptionProtocol)
{
case SubscriptionProtocol.GraphQLWebSocket:
return "graphql-transport-ws";
return SubscriptionProtocols.GRAPGQL_TRANSPORT_WS;

case SubscriptionProtocol.ApolloWebSocket:
return "graphql-ws";
return SubscriptionProtocols.GRAPHQL_WS;

default:
throw new Exception($"{subscriptionProtocol} is unknown");
throw new Exception($"{_graphClient.SubscriptionProtocol} is unknown");
}
}

private string GetSubscribeCommand()
{
switch (subscriptionProtocol)
switch (_graphClient.SubscriptionProtocol)
{
case SubscriptionProtocol.GraphQLWebSocket:
return "subscribe";
return SubscribeCommands.SUBSCRIBE;

case SubscriptionProtocol.ApolloWebSocket:
return "start";
return SubscribeCommands.START;

default:
throw new Exception($"{subscriptionProtocol} is unknown");
throw new Exception($"{_graphClient.SubscriptionProtocol} is unknown");
}
}

private void LogMessage(string message)
private static void LogMessage(string message)
{
Console.WriteLine($"{message} - {DateTime.Now.ToString("T")}");
// Write logs to debug console
Debug.WriteLine($"{message} - {DateTime.Now.ToString("T")}");
}

private void SendRequest(WebsocketRequest request)
Expand Down
2 changes: 1 addition & 1 deletion src/Linq2GraphQL.Client.Subscriptions/WebsocketRequest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public WebsocketRequest(string type)

[JsonPropertyName("type")] public string Type { get; set; }

[JsonPropertyName("payload")] public GraphQLRequest Payload { get; set; }
[JsonPropertyName("payload")] public object Payload { get; set; }
}

//public class WebsocketRequestPayload
Expand Down
10 changes: 10 additions & 0 deletions src/Linq2GraphQL.Client.Subscriptions/WebsocketRequestTypes.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
namespace Linq2GraphQL.Client.Subscriptions
{
internal class WebsocketRequestTypes
{
internal const string PING = "ping";
internal const string PONG = "pong";
internal const string CONNECTION_INIT = "connection_init";

}
}
1 change: 1 addition & 0 deletions src/Linq2GraphQL.Client/GraphClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public GraphClient(HttpClient httpClient, IOptions<GraphClientOptions> options,
public HttpClient HttpClient { get; }
public JsonSerializerOptions SerializerOptions { get; }

public Func<GraphClient, Task<GraphQLRequest>> WSConnectionInitPayload => options.Value.WSConnectionInitPayload;
private string GetSubscriptionUrl()
{
var baseUrl = HttpClient?.BaseAddress.ToString();
Expand Down
2 changes: 2 additions & 0 deletions src/Linq2GraphQL.Client/GraphClientOptions.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@

namespace Linq2GraphQL.Client;

public class GraphClientOptions
{
public bool UseSafeMode { get; set; } = false;
public SubscriptionProtocol SubscriptionProtocol { get; set; } = default;
public Func<GraphClient, Task<GraphQLRequest>> WSConnectionInitPayload { get; set; } = opts => null;
}