Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions RealmTest.xcodeproj/project.pbxproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@

/* Begin PBXBuildFile section */
00144D1B2666668D00EE7664 /* UserStorage.swift in Sources */ = {isa = PBXBuildFile; fileRef = 00144D1A2666668D00EE7664 /* UserStorage.swift */; };
001FC00826CBED5F00795791 /* ListenRealm.swift in Sources */ = {isa = PBXBuildFile; fileRef = 001FC00726CBED5F00795791 /* ListenRealm.swift */; };
0026291926B1DDBC00668D1F /* main.swift in Sources */ = {isa = PBXBuildFile; fileRef = 0026291826B1DDBC00668D1F /* main.swift */; };
0026291D26B1DDE800668D1F /* TestsAppDelegate.swift in Sources */ = {isa = PBXBuildFile; fileRef = 0026291C26B1DDE800668D1F /* TestsAppDelegate.swift */; };
0039B01326CCDB3F006B8DA2 /* ThreadPool.swift in Sources */ = {isa = PBXBuildFile; fileRef = 0039B01226CCDB3F006B8DA2 /* ThreadPool.swift */; };
0039B01526CCDBD6006B8DA2 /* RunLoopThreadWorker.swift in Sources */ = {isa = PBXBuildFile; fileRef = 0039B01426CCDBD6006B8DA2 /* RunLoopThreadWorker.swift */; };
0065838426C53B97005326B0 /* AppUserContainer.swift in Sources */ = {isa = PBXBuildFile; fileRef = 0065838326C53B97005326B0 /* AppUserContainer.swift */; };
0065839026C53D28005326B0 /* UserContainer.swift in Sources */ = {isa = PBXBuildFile; fileRef = 0065838F26C53D28005326B0 /* UserContainer.swift */; };
0065839226C53D55005326B0 /* KeyedUserContainer.swift in Sources */ = {isa = PBXBuildFile; fileRef = 0065839126C53D55005326B0 /* KeyedUserContainer.swift */; };
Expand Down Expand Up @@ -55,8 +58,11 @@

/* Begin PBXFileReference section */
00144D1A2666668D00EE7664 /* UserStorage.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = UserStorage.swift; sourceTree = "<group>"; };
001FC00726CBED5F00795791 /* ListenRealm.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ListenRealm.swift; sourceTree = "<group>"; };
0026291826B1DDBC00668D1F /* main.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = main.swift; sourceTree = "<group>"; };
0026291C26B1DDE800668D1F /* TestsAppDelegate.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = TestsAppDelegate.swift; sourceTree = "<group>"; };
0039B01226CCDB3F006B8DA2 /* ThreadPool.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ThreadPool.swift; sourceTree = "<group>"; };
0039B01426CCDBD6006B8DA2 /* RunLoopThreadWorker.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = RunLoopThreadWorker.swift; sourceTree = "<group>"; };
0065838326C53B97005326B0 /* AppUserContainer.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = AppUserContainer.swift; sourceTree = "<group>"; };
0065838F26C53D28005326B0 /* UserContainer.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = UserContainer.swift; sourceTree = "<group>"; };
0065839126C53D55005326B0 /* KeyedUserContainer.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = KeyedUserContainer.swift; sourceTree = "<group>"; };
Expand Down Expand Up @@ -114,6 +120,15 @@
/* End PBXFrameworksBuildPhase section */

/* Begin PBXGroup section */
0039B01126CCDB33006B8DA2 /* ThreadPool */ = {
isa = PBXGroup;
children = (
0039B01226CCDB3F006B8DA2 /* ThreadPool.swift */,
0039B01426CCDBD6006B8DA2 /* RunLoopThreadWorker.swift */,
);
path = ThreadPool;
sourceTree = "<group>";
};
0065838226C53AA4005326B0 /* Models */ = {
isa = PBXGroup;
children = (
Expand Down Expand Up @@ -144,6 +159,7 @@
children = (
00BBF68C2674A13F0033844D /* Single.swift */,
0065839926C53F99005326B0 /* Publisher+.swift */,
001FC00726CBED5F00795791 /* ListenRealm.swift */,
);
path = Helpers;
sourceTree = "<group>";
Expand Down Expand Up @@ -189,6 +205,7 @@
00AAC4972660257C009F43A1 /* RealmTest */ = {
isa = PBXGroup;
children = (
0039B01126CCDB33006B8DA2 /* ThreadPool */,
0065839426C53EE0005326B0 /* Flow */,
0065839326C53EC9005326B0 /* Helpers */,
0065838226C53AA4005326B0 /* Models */,
Expand Down Expand Up @@ -386,6 +403,7 @@
buildActionMask = 2147483647;
files = (
00F6202A267881A700107F16 /* PersistenceGatewayProtocol.swift in Sources */,
0039B01326CCDB3F006B8DA2 /* ThreadPool.swift in Sources */,
0065839A26C53F99005326B0 /* Publisher+.swift in Sources */,
0026291926B1DDBC00668D1F /* main.swift in Sources */,
0065839626C53EEC005326B0 /* ListView.swift in Sources */,
Expand All @@ -396,8 +414,10 @@
00B4581426602BE700D15EFD /* User.swift in Sources */,
00A6D5C826C304CC008E0BFC /* ImmediateScheduler.swift in Sources */,
0065839826C53F04005326B0 /* ViewModel.swift in Sources */,
001FC00826CBED5F00795791 /* ListenRealm.swift in Sources */,
00BBF68D2674A13F0033844D /* Single.swift in Sources */,
00B458102660291900D15EFD /* PersistenceGateway.swift in Sources */,
0039B01526CCDBD6006B8DA2 /* RunLoopThreadWorker.swift in Sources */,
00B4581226602BD200D15EFD /* Mapper.swift in Sources */,
00A6D5CA26C304CC008E0BFC /* TestScheduler.swift in Sources */,
00F62025267880B600107F16 /* ChangesetItem.swift in Sources */,
Expand Down
18 changes: 18 additions & 0 deletions RealmTest/AppDelegate.swift
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,24 @@ class AppDelegate: UIResponder, UIApplicationDelegate {

window?.rootViewController = nav
window?.makeKeyAndVisible()

// let count = 20
// var workers: [ThreadWorker] = []
// DispatchQueue.concurrentPerform(iterations: count) { operation in
// let worker = ThreadPool.shared.start(name: "TestThread \(operation)") {
// print("Operation #\(operation) has executed on thread \(Thread.current.name)")
// }
// workers.append(worker)
// }
//
// Thread.sleep(forTimeInterval: 2)
// print("Will start STOP operation")
//
// DispatchQueue.concurrentPerform(iterations: count) { operation in
// workers[operation].stop()
// }


return true
}
}
75 changes: 70 additions & 5 deletions RealmTest/Flow/ViewModel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import Combine
final class ViewModel: ObservableObject {
private let userStorage = UserStorage()
private var subscriptions = Set<AnyCancellable>()
private var cancel1: AnyCancellable?
private var cancel2: AnyCancellable?

private let userSubject = CurrentValueSubject<[User], Never>([])

Expand All @@ -30,12 +32,75 @@ final class ViewModel: ObservableObject {
.store(in: &subscriptions)

userStorage.saveContainer()
.flatMap { [userStorage] in
userStorage.listenChangesetContainer()
}
.sink(receiveCompletion: { _ in }, receiveValue: { [unowned self] changeset in
self.handle(changeset: changeset)
// .flatMap { [weak self] _ in
// self?.startListen()
// }
.sink(receiveCompletion: { _ in }, receiveValue: { [unowned self] _ in
self.startListen()
// self.startListen2()
})
.store(in: &subscriptions)
}

private func startListen() {
print("DEBUG: Res 1 = Will start")

cancel1 = userStorage.listenChangesetContainer()
.handleEvents(receiveCompletion: { res in
print("DEBUG: Res 1 = \(res)")
})
.sink(
receiveCompletion: { [weak self] res in
switch res {
case .failure:
self?.startListen()
case .finished:
break
}
},
receiveValue: { [unowned self] changeset in
print("update1. Thread = \(Thread.current.name)")
self.handle(changeset: changeset)
}
)

cancel2 = userStorage.listenChangesetContainer()
.handleEvents(receiveCompletion: { res in
print("Res 2 = \(res)")
})
.sink(
receiveCompletion: { [weak self] res in
switch res {
case .failure:
print("cancel1 = \(self!.cancel1), cancel2 = \(self!.cancel2),")
case .finished:
break
}
},
receiveValue: { [unowned self] changeset in
print("update2. Thread = \(Thread.current.name)")
}
)
}

private func startListen2() {
userStorage.listenChangesetContainer()
.handleEvents(receiveCompletion: { res in
print("Res 2 = \(res)")
})
.sink(
receiveCompletion: { [weak self] res in
switch res {
case .failure:
self?.startListen2()
case .finished:
break
}
},
receiveValue: { [unowned self] changeset in
print("update2. Thread = \(Thread.current.name)")
}
)
.store(in: &subscriptions)
}

Expand Down
106 changes: 106 additions & 0 deletions RealmTest/Helpers/ListenRealm.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
//
// ListenRealm.swift
// RealmTest
//
// Created by Vladislav Sedinkin on 17.08.2021.
//

import Combine
import RealmSwift
import Foundation

enum ListenOn {
case scheduler(AnySchedulerOf<RunLoop>)
case thread
}

extension Publishers {
struct ListenRealm: Publisher {
typealias Output = Realm
typealias Failure = Error

let config: Realm.Configuration
let listenOn: ListenOn

init(config: Realm.Configuration, listenOn: ListenOn) {
self.config = config
self.listenOn = listenOn
}

func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input {
let subscription = Inner(publisher: self, subscriber: subscriber)
subscriber.receive(subscription: subscription)
}
}
}

private extension Publishers.ListenRealm {
final class Inner<S: Subscriber> where S.Input == Output, S.Failure == Failure {
typealias Input = Realm.Configuration
typealias Failure = Error

private var publisher: Publishers.ListenRealm?
private var subscriber: S?
private var worker: ThreadWorker?

init(publisher: Publishers.ListenRealm, subscriber: S) {
self.publisher = publisher
self.subscriber = subscriber

switch publisher.listenOn {
case .thread:
worker = ThreadPool.shared.start(name: "ListenBGRealm") { [weak self] in
self?.createRealm()
}
case let .scheduler(scheduler):
scheduler.schedule { [weak self] in
self?.createRealm()
}
}
}

private func createRealm() {
guard let publisher = publisher else {
return
}

do {
let _realm = try Realm(configuration: publisher.config)
Swift.print("DEBUG: New realm")
_ = subscriber?.receive(_realm)
} catch {
Swift.print("DEBUG: New realm error \(error)")
subscriber?.receive(completion: .failure(error))
}
}

deinit {
Swift.print("here")
}
}
}

extension Publishers.ListenRealm.Inner: Subscription {
func request(_ demand: Subscribers.Demand) {}

func cancel() {
worker?.stop()
subscriber = nil
publisher = nil
}
}

extension Publishers.ListenRealm.Inner: Subscriber {
func receive(subscription: Subscription) {
subscription.request(.max(1))
}

func receive(_ input: Realm.Configuration) -> Subscribers.Demand {
return .none
}

func receive(completion: Subscribers.Completion<Error>) {
Swift.print("Receive completion \(completion)")
// subscriber?.receive(completion: completion)
}
}
2 changes: 0 additions & 2 deletions RealmTest/Helpers/Single.swift
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,10 @@ fileprivate final class SingleSink<Upstream: Publisher, Downstream: Subscriber>:
}

func receive(subscription: Subscription) {
print(subscription)
subscription.request(.max(1))
}

func receive(_ input: Upstream.Output) -> Subscribers.Demand {
print(input)
element = input
_ = downstream.receive(input)
downstream.receive(completion: .finished)
Expand Down
Loading