Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ let package = Package(
products: [
.library(name: "Split", targets: ["Split"]),

.library(name: "SplitCommons", targets: ["Logging", "Http", "BackoffCounter", "PeriodicRecorderWorker", "Tracker", "Concurrency"]),],
.library(name: "SplitCommons", targets: ["Logging", "Http", "BackoffCounter", "PeriodicRecorderWorker", "Tracker", "Concurrency", "Streaming"]),],
targets: [

// MARK: Split
.target(
name: "Split",
dependencies: ["BackoffCounter", "Concurrency", "Http", "Logging", "PeriodicRecorderWorker", "Tracker"],
dependencies: ["BackoffCounter", "Concurrency", "Http", "Logging", "PeriodicRecorderWorker", "Streaming", "Tracker"],
path: "Split",
exclude: [
"Common/Yaml/LICENSE",
Expand Down Expand Up @@ -85,7 +85,7 @@ let package = Package(
),
.testTarget(
name: "TrackerTests",
dependencies: ["TrackerTests"],
dependencies: ["Tracker"],
path: "Sources/Tracker/Tests"
),

Expand All @@ -99,6 +99,17 @@ let package = Package(
dependencies: ["Concurrency"],
path: "Sources/Concurrency/Tests"
),

.target(
name: "Streaming",
dependencies: ["Concurrency", "Http", "Logging"],
exclude: ["Tests", "README.md"]
),
.testTarget(
name: "StreamingTests",
dependencies: ["Streaming"],
path: "Sources/Streaming/Tests"
),
// #INJECT_TARGET
]
)
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,17 @@

import Foundation

class EventStreamParser: @unchecked Sendable {
static let kIdField = "id"
static let kDataField = "data"
static let kEventField = "event"
public class EventStreamParser: @unchecked Sendable {
public static let kIdField = "id"
public static let kDataField = "data"
public static let kEventField = "event"
private static let kKeepAliveEvent = "keepalive"
private static let kFieldSeparator: Character = ":"
private static let kKeepAliveToken = "\(kFieldSeparator)\(kKeepAliveEvent)"

func parse(streamChunk: String) -> [String: String] {
public init() {}

public func parse(streamChunk: String) -> [String: String] {

var messageValues = [String: String]()
let messageLines = streamChunk.split(separator: "\n")
Expand All @@ -27,7 +29,7 @@ class EventStreamParser: @unchecked Sendable {
return messageValues
}

if trimmedLine.isEmpty() {
if trimmedLine.isEmpty {
return messageValues
}

Expand All @@ -47,7 +49,7 @@ class EventStreamParser: @unchecked Sendable {
return messageValues
}

func isKeepAlive(values: [String: String]) -> Bool {
public func isKeepAlive(values: [String: String]) -> Bool {
return values.contains { eventType, value in
return eventType == Self.kEventField &&
value.trimmingCharacters(in: .whitespacesAndNewlines).lowercased() == Self.kKeepAliveEvent
Expand Down
1 change: 1 addition & 0 deletions Sources/Streaming/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Streaming
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,26 @@
//

import Foundation

struct SseClientConstants {
static let pushNotificationChannelsParam = "channels"
static let pushNotificationTokenParam = "accessToken"
static let pushNotificationVersionParam = "v"
static let pushNotificationVersionValue = "1.1"
import Concurrency
import Http
import Logging

public struct SseClientConstants {
public static let pushNotificationChannelsParam = "channels"
public static let pushNotificationTokenParam = "accessToken"
public static let pushNotificationVersionParam = "v"
public static let pushNotificationVersionValue = "1.1"
}

protocol SseClient: AnyObject {
public protocol SseClient: AnyObject {
typealias CompletionHandler = @Sendable (Bool) -> Void
func connect(token: String, channels: [String], completion: @escaping CompletionHandler)
func disconnect()
var isConnectionOpened: Bool { get }

}

class DefaultSseClient: SseClient, @unchecked Sendable {
public class DefaultSseClient: SseClient, @unchecked Sendable {

///
/// NOTE:
Expand All @@ -38,18 +42,18 @@ class DefaultSseClient: SseClient, @unchecked Sendable {
private var isDisconnectCalled: Atomic<Bool> = Atomic(false)
private var isConnected: Atomic<Bool> = Atomic(false)
private var isFirstMessage: Atomic<Bool> = Atomic(false)
var isConnectionOpened: Bool {
public var isConnectionOpened: Bool {
return isConnected.value
}

init(endpoint: Endpoint, httpClient: HttpClient, sseHandler: SseHandler) {
public init(endpoint: Endpoint, httpClient: HttpClient, sseHandler: SseHandler) {
self.endpoint = endpoint
self.httpClient = httpClient
self.sseHandler = sseHandler
self.queue = DispatchQueue(label: "split-sse-client")
}

func connect(token: String, channels: [String], completion: @escaping CompletionHandler) {
public func connect(token: String, channels: [String], completion: @escaping CompletionHandler) {
queue.async(flags: .barrier) { [weak self] in
guard let self = self else { return }
let parameters: [String: Any] = [
Expand Down Expand Up @@ -105,7 +109,7 @@ class DefaultSseClient: SseClient, @unchecked Sendable {
}
}

func disconnect() {
public func disconnect() {
Logger.d("Disconnecting SSE client")
isDisconnectCalled.set(true)
isConnected.set(false)
Expand All @@ -132,7 +136,7 @@ class DefaultSseClient: SseClient, @unchecked Sendable {

guard let self = self else { return }

let values = self.streamParser.parse(streamChunk: data.stringRepresentation)
let values = self.streamParser.parse(streamChunk: String(data: data, encoding: .utf8) ?? "")

if self.isFirstMessage.value {
if self.isConnectionConfirmed(message: values) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,26 @@
//

import Foundation
import Http

protocol SseClientFactory {
public protocol SseClientFactory {
func create() -> SseClient
}

class DefaultSseClientFactory: SseClientFactory {
public class DefaultSseClientFactory: SseClientFactory {
private let endpoint: Endpoint
private let httpClient: HttpClient
private let sseHandler: SseHandler

init(endpoint: Endpoint,
public init(endpoint: Endpoint,
httpClient: HttpClient,
sseHandler: SseHandler) {
self.endpoint = endpoint
self.httpClient = httpClient
self.sseHandler = sseHandler
}

func create() -> SseClient {
public func create() -> SseClient {
DefaultSseClient(endpoint: endpoint,
httpClient: httpClient,
sseHandler: sseHandler)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,33 +7,35 @@
//

import Foundation
import Concurrency
import Logging

class SseConnectionHandler: @unchecked Sendable {
public class SseConnectionHandler: @unchecked Sendable {
private let clientLock = NSLock()
private let sseClientFactory: SseClientFactory
private var curClientId: String?
private let clients = SynchronizedDictionary<String, SseClient>()

var isConnectionOpened: Bool {
public var isConnectionOpened: Bool {
guard let id = curClientId else { return false }
return clients.value(forKey: id)?.isConnectionOpened ?? false
}

init(sseClientFactory: SseClientFactory) {
public init(sseClientFactory: SseClientFactory) {
self.sseClientFactory = sseClientFactory
}

func connect(jwt: JwtToken, channels: [String], completion: @escaping SseClient.CompletionHandler) {
public func connect(token: String, channels: [String], completion: @escaping SseClient.CompletionHandler) {
let sseClient = sseClientFactory.create()
addSseClient(sseClient)
sseClient.connect(token: jwt.rawToken, channels: jwt.channels, completion: completion)
sseClient.connect(token: token, channels: channels, completion: completion)
}

func disconnect() {
public func disconnect() {
Logger.d("Streaming Connection Handler - Disconnecting SSE client")
let disconnectingClientId = curClientId
clearClientId()
DispatchQueue.general.async { [weak self] in
DispatchQueue.global().async { [weak self] in
guard let self = self else { return }
guard let clientId = disconnectingClientId else { return }
let cli = self.getSseClient(id: clientId)
Expand All @@ -42,7 +44,7 @@ class SseConnectionHandler: @unchecked Sendable {
}
}

func destroy() {
public func destroy() {
for client in clients.takeAll().values {
client.disconnect()
}
Expand Down
7 changes: 7 additions & 0 deletions Sources/Streaming/SseHandler.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import Foundation

public protocol SseHandler: AnyObject {
func isConnectionConfirmed(message: [String: String]) -> Bool
func handleIncomingMessage(message: [String: String])
func reportError(isRetryable: Bool)
}
1 change: 1 addition & 0 deletions Sources/Streaming/Streaming.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
import Foundation
9 changes: 9 additions & 0 deletions Sources/Streaming/Tests/StreamingTests.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import XCTest
@testable import Streaming

final class StreamingTests: XCTestCase {
func testExample() {
// Add your tests here
XCTAssertTrue(true)
}
}
Loading
Loading