diff --git a/RealmTest.xcodeproj/project.pbxproj b/RealmTest.xcodeproj/project.pbxproj index eb81cec..5a8c5c8 100644 --- a/RealmTest.xcodeproj/project.pbxproj +++ b/RealmTest.xcodeproj/project.pbxproj @@ -8,8 +8,11 @@ /* Begin PBXBuildFile section */ 00144D1B2666668D00EE7664 /* UserStorage.swift in Sources */ = {isa = PBXBuildFile; fileRef = 00144D1A2666668D00EE7664 /* UserStorage.swift */; }; + 001FC00826CBED5F00795791 /* ListenRealm.swift in Sources */ = {isa = PBXBuildFile; fileRef = 001FC00726CBED5F00795791 /* ListenRealm.swift */; }; 0026291926B1DDBC00668D1F /* main.swift in Sources */ = {isa = PBXBuildFile; fileRef = 0026291826B1DDBC00668D1F /* main.swift */; }; 0026291D26B1DDE800668D1F /* TestsAppDelegate.swift in Sources */ = {isa = PBXBuildFile; fileRef = 0026291C26B1DDE800668D1F /* TestsAppDelegate.swift */; }; + 0039B01326CCDB3F006B8DA2 /* ThreadPool.swift in Sources */ = {isa = PBXBuildFile; fileRef = 0039B01226CCDB3F006B8DA2 /* ThreadPool.swift */; }; + 0039B01526CCDBD6006B8DA2 /* RunLoopThreadWorker.swift in Sources */ = {isa = PBXBuildFile; fileRef = 0039B01426CCDBD6006B8DA2 /* RunLoopThreadWorker.swift */; }; 0065838426C53B97005326B0 /* AppUserContainer.swift in Sources */ = {isa = PBXBuildFile; fileRef = 0065838326C53B97005326B0 /* AppUserContainer.swift */; }; 0065839026C53D28005326B0 /* UserContainer.swift in Sources */ = {isa = PBXBuildFile; fileRef = 0065838F26C53D28005326B0 /* UserContainer.swift */; }; 0065839226C53D55005326B0 /* KeyedUserContainer.swift in Sources */ = {isa = PBXBuildFile; fileRef = 0065839126C53D55005326B0 /* KeyedUserContainer.swift */; }; @@ -55,8 +58,11 @@ /* Begin PBXFileReference section */ 00144D1A2666668D00EE7664 /* UserStorage.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = UserStorage.swift; sourceTree = ""; }; + 001FC00726CBED5F00795791 /* ListenRealm.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ListenRealm.swift; sourceTree = ""; }; 0026291826B1DDBC00668D1F /* main.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = main.swift; sourceTree = ""; }; 0026291C26B1DDE800668D1F /* TestsAppDelegate.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = TestsAppDelegate.swift; sourceTree = ""; }; + 0039B01226CCDB3F006B8DA2 /* ThreadPool.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ThreadPool.swift; sourceTree = ""; }; + 0039B01426CCDBD6006B8DA2 /* RunLoopThreadWorker.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = RunLoopThreadWorker.swift; sourceTree = ""; }; 0065838326C53B97005326B0 /* AppUserContainer.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = AppUserContainer.swift; sourceTree = ""; }; 0065838F26C53D28005326B0 /* UserContainer.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = UserContainer.swift; sourceTree = ""; }; 0065839126C53D55005326B0 /* KeyedUserContainer.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = KeyedUserContainer.swift; sourceTree = ""; }; @@ -114,6 +120,15 @@ /* End PBXFrameworksBuildPhase section */ /* Begin PBXGroup section */ + 0039B01126CCDB33006B8DA2 /* ThreadPool */ = { + isa = PBXGroup; + children = ( + 0039B01226CCDB3F006B8DA2 /* ThreadPool.swift */, + 0039B01426CCDBD6006B8DA2 /* RunLoopThreadWorker.swift */, + ); + path = ThreadPool; + sourceTree = ""; + }; 0065838226C53AA4005326B0 /* Models */ = { isa = PBXGroup; children = ( @@ -144,6 +159,7 @@ children = ( 00BBF68C2674A13F0033844D /* Single.swift */, 0065839926C53F99005326B0 /* Publisher+.swift */, + 001FC00726CBED5F00795791 /* ListenRealm.swift */, ); path = Helpers; sourceTree = ""; @@ -189,6 +205,7 @@ 00AAC4972660257C009F43A1 /* RealmTest */ = { isa = PBXGroup; children = ( + 0039B01126CCDB33006B8DA2 /* ThreadPool */, 0065839426C53EE0005326B0 /* Flow */, 0065839326C53EC9005326B0 /* Helpers */, 0065838226C53AA4005326B0 /* Models */, @@ -386,6 +403,7 @@ buildActionMask = 2147483647; files = ( 00F6202A267881A700107F16 /* PersistenceGatewayProtocol.swift in Sources */, + 0039B01326CCDB3F006B8DA2 /* ThreadPool.swift in Sources */, 0065839A26C53F99005326B0 /* Publisher+.swift in Sources */, 0026291926B1DDBC00668D1F /* main.swift in Sources */, 0065839626C53EEC005326B0 /* ListView.swift in Sources */, @@ -396,8 +414,10 @@ 00B4581426602BE700D15EFD /* User.swift in Sources */, 00A6D5C826C304CC008E0BFC /* ImmediateScheduler.swift in Sources */, 0065839826C53F04005326B0 /* ViewModel.swift in Sources */, + 001FC00826CBED5F00795791 /* ListenRealm.swift in Sources */, 00BBF68D2674A13F0033844D /* Single.swift in Sources */, 00B458102660291900D15EFD /* PersistenceGateway.swift in Sources */, + 0039B01526CCDBD6006B8DA2 /* RunLoopThreadWorker.swift in Sources */, 00B4581226602BD200D15EFD /* Mapper.swift in Sources */, 00A6D5CA26C304CC008E0BFC /* TestScheduler.swift in Sources */, 00F62025267880B600107F16 /* ChangesetItem.swift in Sources */, diff --git a/RealmTest/AppDelegate.swift b/RealmTest/AppDelegate.swift index 0d34f06..cfa1476 100644 --- a/RealmTest/AppDelegate.swift +++ b/RealmTest/AppDelegate.swift @@ -21,6 +21,24 @@ class AppDelegate: UIResponder, UIApplicationDelegate { window?.rootViewController = nav window?.makeKeyAndVisible() + +// let count = 20 +// var workers: [ThreadWorker] = [] +// DispatchQueue.concurrentPerform(iterations: count) { operation in +// let worker = ThreadPool.shared.start(name: "TestThread \(operation)") { +// print("Operation #\(operation) has executed on thread \(Thread.current.name)") +// } +// workers.append(worker) +// } +// +// Thread.sleep(forTimeInterval: 2) +// print("Will start STOP operation") +// +// DispatchQueue.concurrentPerform(iterations: count) { operation in +// workers[operation].stop() +// } + + return true } } diff --git a/RealmTest/Flow/ViewModel.swift b/RealmTest/Flow/ViewModel.swift index 4e53b91..09f1b62 100644 --- a/RealmTest/Flow/ViewModel.swift +++ b/RealmTest/Flow/ViewModel.swift @@ -11,6 +11,8 @@ import Combine final class ViewModel: ObservableObject { private let userStorage = UserStorage() private var subscriptions = Set() + private var cancel1: AnyCancellable? + private var cancel2: AnyCancellable? private let userSubject = CurrentValueSubject<[User], Never>([]) @@ -30,12 +32,75 @@ final class ViewModel: ObservableObject { .store(in: &subscriptions) userStorage.saveContainer() - .flatMap { [userStorage] in - userStorage.listenChangesetContainer() - } - .sink(receiveCompletion: { _ in }, receiveValue: { [unowned self] changeset in - self.handle(changeset: changeset) +// .flatMap { [weak self] _ in +// self?.startListen() +// } + .sink(receiveCompletion: { _ in }, receiveValue: { [unowned self] _ in + self.startListen() +// self.startListen2() + }) + .store(in: &subscriptions) + } + + private func startListen() { + print("DEBUG: Res 1 = Will start") + + cancel1 = userStorage.listenChangesetContainer() + .handleEvents(receiveCompletion: { res in + print("DEBUG: Res 1 = \(res)") + }) + .sink( + receiveCompletion: { [weak self] res in + switch res { + case .failure: + self?.startListen() + case .finished: + break + } + }, + receiveValue: { [unowned self] changeset in + print("update1. Thread = \(Thread.current.name)") + self.handle(changeset: changeset) + } + ) + + cancel2 = userStorage.listenChangesetContainer() + .handleEvents(receiveCompletion: { res in + print("Res 2 = \(res)") }) + .sink( + receiveCompletion: { [weak self] res in + switch res { + case .failure: + print("cancel1 = \(self!.cancel1), cancel2 = \(self!.cancel2),") + case .finished: + break + } + }, + receiveValue: { [unowned self] changeset in + print("update2. Thread = \(Thread.current.name)") + } + ) + } + + private func startListen2() { + userStorage.listenChangesetContainer() + .handleEvents(receiveCompletion: { res in + print("Res 2 = \(res)") + }) + .sink( + receiveCompletion: { [weak self] res in + switch res { + case .failure: + self?.startListen2() + case .finished: + break + } + }, + receiveValue: { [unowned self] changeset in + print("update2. Thread = \(Thread.current.name)") + } + ) .store(in: &subscriptions) } diff --git a/RealmTest/Helpers/ListenRealm.swift b/RealmTest/Helpers/ListenRealm.swift new file mode 100644 index 0000000..a9affbb --- /dev/null +++ b/RealmTest/Helpers/ListenRealm.swift @@ -0,0 +1,106 @@ +// +// ListenRealm.swift +// RealmTest +// +// Created by Vladislav Sedinkin on 17.08.2021. +// + +import Combine +import RealmSwift +import Foundation + +enum ListenOn { + case scheduler(AnySchedulerOf) + case thread +} + +extension Publishers { + struct ListenRealm: Publisher { + typealias Output = Realm + typealias Failure = Error + + let config: Realm.Configuration + let listenOn: ListenOn + + init(config: Realm.Configuration, listenOn: ListenOn) { + self.config = config + self.listenOn = listenOn + } + + func receive(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input { + let subscription = Inner(publisher: self, subscriber: subscriber) + subscriber.receive(subscription: subscription) + } + } +} + +private extension Publishers.ListenRealm { + final class Inner where S.Input == Output, S.Failure == Failure { + typealias Input = Realm.Configuration + typealias Failure = Error + + private var publisher: Publishers.ListenRealm? + private var subscriber: S? + private var worker: ThreadWorker? + + init(publisher: Publishers.ListenRealm, subscriber: S) { + self.publisher = publisher + self.subscriber = subscriber + + switch publisher.listenOn { + case .thread: + worker = ThreadPool.shared.start(name: "ListenBGRealm") { [weak self] in + self?.createRealm() + } + case let .scheduler(scheduler): + scheduler.schedule { [weak self] in + self?.createRealm() + } + } + } + + private func createRealm() { + guard let publisher = publisher else { + return + } + + do { + let _realm = try Realm(configuration: publisher.config) + Swift.print("DEBUG: New realm") + _ = subscriber?.receive(_realm) + } catch { + Swift.print("DEBUG: New realm error \(error)") + subscriber?.receive(completion: .failure(error)) + } + } + + deinit { + Swift.print("here") + } + } +} + +extension Publishers.ListenRealm.Inner: Subscription { + func request(_ demand: Subscribers.Demand) {} + + func cancel() { + worker?.stop() + subscriber = nil + publisher = nil + } +} + +extension Publishers.ListenRealm.Inner: Subscriber { + func receive(subscription: Subscription) { + subscription.request(.max(1)) + } + + func receive(_ input: Realm.Configuration) -> Subscribers.Demand { + return .none + } + + func receive(completion: Subscribers.Completion) { + Swift.print("Receive completion \(completion)") +// subscriber?.receive(completion: completion) + } +} diff --git a/RealmTest/Helpers/Single.swift b/RealmTest/Helpers/Single.swift index 971a81f..023c2fe 100644 --- a/RealmTest/Helpers/Single.swift +++ b/RealmTest/Helpers/Single.swift @@ -50,12 +50,10 @@ fileprivate final class SingleSink: } func receive(subscription: Subscription) { - print(subscription) subscription.request(.max(1)) } func receive(_ input: Upstream.Output) -> Subscribers.Demand { - print(input) element = input _ = downstream.receive(input) downstream.receive(completion: .finished) diff --git a/RealmTest/Realm/PersistenceGateway.swift b/RealmTest/Realm/PersistenceGateway.swift index 66a80de..6f1b7e2 100644 --- a/RealmTest/Realm/PersistenceGateway.swift +++ b/RealmTest/Realm/PersistenceGateway.swift @@ -16,76 +16,104 @@ final class PersistenceGateway: PersistenceGatewayProtocol { private let configuration: Realm.Configuration private let regularScheduler: AnySchedulerOf private let listenScheduler: AnySchedulerOf + private let listenOn: ListenOn + +// func getRealm() -> AnyPublisher { +// return Deferred { +// return Future { [unowned self] promise in +// _ = ThreadPool.shared.start(name: "RealmListen") { +// let realm = try! Realm(configuration: self.configuration) +// promise(.success(realm)) +// } +// } +// }.eraseToAnyPublisher() +// } init( regularScheduler: AnySchedulerOf, listenScheduler: AnySchedulerOf = .main, + listenOn: ListenOn = .thread, configuration: Realm.Configuration = .init() ) { self.regularScheduler = regularScheduler self.listenScheduler = listenScheduler + self.listenOn = listenOn self.configuration = configuration } - private func realm(scheduler: S) -> AnySinglePublisher { + private func realm(scheduler: S) -> AnyPublisher { // Создание рилма в определенном потоке return Just((configuration, nil)) .receive(on: scheduler) .tryMap(Realm.init) - .eraseToAnySinglePublisher() + .eraseToAnyPublisher() } // MARK: Get - func get(mapper: M, filterBlock: @escaping GetResultBlock) -> AnySinglePublisher { + func get(mapper: M, filterBlock: @escaping GetResultBlock) -> AnyPublisher { return realm(scheduler: regularScheduler) .map { $0.objects(M.PersistenceModel.self) } .map { filterBlock($0).last.map(mapper.convert) } - .eraseToAnySinglePublisher() + .eraseToAnyPublisher() } - func getArray(mapper: M, filterBlock: @escaping GetResultBlock) -> AnySinglePublisher<[M.DomainModel], Error> { + func getArray(mapper: M, filterBlock: @escaping GetResultBlock) -> AnyPublisher<[M.DomainModel], Error> { return realm(scheduler: regularScheduler) .map { $0.objects(M.PersistenceModel.self) } .map { filterBlock($0) } .map { $0.map(mapper.convert) } - .eraseToAnySinglePublisher() + .eraseToAnyPublisher() } // MARK: Listen func listen(mapper: M, filterBlock: @escaping GetResultBlock) -> AnyPublisher { - return realm(scheduler: listenScheduler) + return Publishers.ListenRealm(config: configuration, listenOn: listenOn) .map { $0.objects(M.PersistenceModel.self) } // Получает список объектов для типа .map { filterBlock($0) } // Фильтрует список объектов для получения только интересующего объекта .flatMap(\.collectionPublisher) // Наблюдает за изменением фильтрованных объектов. Работает даже, если объект не существовал на момент подписки - .freeze() - .receive(on: regularScheduler) .compactMap { $0.last } // Результат может содержать массив объектов, если поиск осуществлялся не по primary key, либо, если primary key нет вовсе. // Для обработки ситуации, когда нет primary key берется `last`, а не `first` .map(mapper.convert) .eraseToAnyPublisher() } + var count = 0 + enum Err: Error { case err } + func listenOrderedArrayChanges( _ sourceType: Source.Type, mapper: Target, filterBlock: @escaping (Results) -> List?, comparator: @escaping (Target.DomainModel, Target.DomainModel) -> Bool ) -> AnyPublisher, Error> { - return realm(scheduler: listenScheduler) + return Publishers.ListenRealm(config: configuration, listenOn: listenOn) +// return getRealm() .map { $0.objects(Source.PersistenceModel.self) } + .handleEvents(receiveOutput: { _ in Swift.print("DEBUG: Item get") }) .flatMap { $0.collectionPublisher + .handleEvents(receiveOutput: { Swift.print("DEBUG: After collection is empty \($0.isEmpty)") }) .filter { !$0.isEmpty } + .handleEvents(receiveOutput: { _ in Swift.print("DEBUG: After filter") }) .prefix(1) } + .handleEvents(receiveOutput: { _ in Swift.print("DEBUG: After flat") }) .compactMap { filterBlock($0) } .flatMap(\.collectionPublisher) - .freeze() - .receive(on: regularScheduler) - .map { $0.map(mapper.convert) } + .handleEvents(receiveOutput: { _ in Swift.print("DEBUG: After last flat") }) + .tryMap { [unowned self] objects -> [Target.DomainModel] in + self.count += 1 + if count % 4 == 0 { + throw Err.err + } else { + return objects.map(mapper.convert) + } + } +// .map { $0.map(mapper.convert) } .diff(comparator: comparator) + .receive(on: regularScheduler) .eraseToAnyPublisher() } @@ -106,12 +134,10 @@ final class PersistenceGateway: PersistenceGatewayProtocol { range: Range?, filterBlock: @escaping GetResultBlock ) -> AnyPublisher<[M.DomainModel], Error> { - return realm(scheduler: listenScheduler) + return Publishers.ListenRealm(config: configuration, listenOn: listenOn) .map { $0.objects(M.PersistenceModel.self) } .map { filterBlock($0) } .flatMap(\.collectionPublisher) - .freeze() - .receive(on: regularScheduler) .map { results -> [M.PersistenceModel] in // Если range существует - получаем слайс из коллекции, иначе берём коллекцию целиком let slice = range.map { $0.clamped(to: 0..(object: M.Model, mapper: M, update: Realm.UpdatePolicy) -> AnySinglePublisher { + func save(object: M.Model, mapper: M, update: Realm.UpdatePolicy) -> AnyPublisher { return realm(scheduler: regularScheduler) .tryMap { realm in let persistence = mapper.convert(model: object) @@ -136,10 +162,10 @@ final class PersistenceGateway: PersistenceGatewayProtocol { return () } - .eraseToAnySinglePublisher() + .eraseToAnyPublisher() } - func save(objects: [M.Model], mapper: M, update: Realm.UpdatePolicy) -> AnySinglePublisher { + func save(objects: [M.Model], mapper: M, update: Realm.UpdatePolicy) -> AnyPublisher { return realm(scheduler: regularScheduler) .tryMap { realm in let persistenceObjects = objects.map(mapper.convert) @@ -152,12 +178,12 @@ final class PersistenceGateway: PersistenceGatewayProtocol { return () } - .eraseToAnySinglePublisher() + .eraseToAnyPublisher() } // MARK: Delete - func delete(_ type: M.Type, deleteHandler: @escaping SaveResultBlock) -> AnySinglePublisher { + func delete(_ type: M.Type, deleteHandler: @escaping SaveResultBlock) -> AnyPublisher { return realm(scheduler: regularScheduler) .tryMap { realm in let objects = realm.objects(M.PersistenceModel.self) @@ -170,7 +196,7 @@ final class PersistenceGateway: PersistenceGatewayProtocol { return () } - .eraseToAnySinglePublisher() + .eraseToAnyPublisher() } func deleteAll() { @@ -185,7 +211,7 @@ final class PersistenceGateway: PersistenceGatewayProtocol { // MARK: Action - func updateAction(_ action: @escaping (Realm) throws -> Void) -> AnySinglePublisher { + func updateAction(_ action: @escaping (Realm) throws -> Void) -> AnyPublisher { return realm(scheduler: regularScheduler) .tryMap { realm in try realm.safeWrite { @@ -193,14 +219,14 @@ final class PersistenceGateway: PersistenceGatewayProtocol { } realm.refresh() } - .eraseToAnySinglePublisher() + .eraseToAnyPublisher() } - func count(_ type: T.Type, filterBlock: @escaping SaveResultBlock) -> AnySinglePublisher { + func count(_ type: T.Type, filterBlock: @escaping SaveResultBlock) -> AnyPublisher { return realm(scheduler: regularScheduler) .map { $0.objects(T.PersistenceModel.self) } .compactMap { filterBlock($0).count } - .eraseToAnySinglePublisher() + .eraseToAnyPublisher() } } diff --git a/RealmTest/Realm/PersistenceGatewayProtocol.swift b/RealmTest/Realm/PersistenceGatewayProtocol.swift index 9f8ed7e..5719334 100644 --- a/RealmTest/Realm/PersistenceGatewayProtocol.swift +++ b/RealmTest/Realm/PersistenceGatewayProtocol.swift @@ -11,19 +11,19 @@ import RealmSwift protocol PersistenceGatewayProtocol: AnyObject { /// Позволяет выполнять любое действие с рилмом. Все действия происходит в транзакции записи - func updateAction(_ action: @escaping (Realm) throws -> Void) -> AnySinglePublisher + func updateAction(_ action: @escaping (Realm) throws -> Void) -> AnyPublisher /// Получает объект из рилма /// - Parameters: /// - mapper: маппер для конвертации объекта рилма в доменный объект /// - filterBlock: фильтр, который является основным инструментом поиска нужного элемента - func get(mapper: M, filterBlock: @escaping GetResultBlock) -> AnySinglePublisher + func get(mapper: M, filterBlock: @escaping GetResultBlock) -> AnyPublisher /// Получает массив объектов из рилма /// - Parameters: /// - mapper: маппер для конвертации объектов рилма в доменные объекты /// - filterBlock: фильтр, который является основным инструментом поиска нужных элементов - func getArray(mapper: M, filterBlock: @escaping GetResultBlock) -> AnySinglePublisher<[M.DomainModel], Error> + func getArray(mapper: M, filterBlock: @escaping GetResultBlock) -> AnyPublisher<[M.DomainModel], Error> /// Наблюдает за изменением объекта. Наблюдение будет валидно, даже, если объект не существовал на момент начала наблюдение и появился после /// - Parameters: @@ -75,36 +75,36 @@ protocol PersistenceGatewayProtocol: AnyObject { /// - Parameters: /// - object: объект для сохранения /// - mapper: маппер для конвертации доменного объекта в рилм - func save(object: M.Model, mapper: M, update: Realm.UpdatePolicy) -> AnySinglePublisher + func save(object: M.Model, mapper: M, update: Realm.UpdatePolicy) -> AnyPublisher /// Сохраняет массив объектов в базу /// - Parameters: /// - objects: объект для сохранения /// - mapper: маппер для конвертации доменных объектов в рилм - func save(objects: [M.Model], mapper: M, update: Realm.UpdatePolicy) -> AnySinglePublisher + func save(objects: [M.Model], mapper: M, update: Realm.UpdatePolicy) -> AnyPublisher /// Удаляет объект, соответствующий фильтру, из базы /// - Parameters: /// - type: тип удаляемого объекта /// - deleteHandler: поиск удаляемого объекта - func delete(_ type: M.Type, deleteHandler: @escaping SaveResultBlock) -> AnySinglePublisher + func delete(_ type: M.Type, deleteHandler: @escaping SaveResultBlock) -> AnyPublisher /// Возвращает количество объектов в базе, удовлетворяющих фильтру /// - Parameters: /// - type: тип объектов для определения количества /// - filterBlock: блок фильтрации - func count(_ type: T.Type, filterBlock: @escaping SaveResultBlock) -> AnySinglePublisher + func count(_ type: T.Type, filterBlock: @escaping SaveResultBlock) -> AnyPublisher /// Очищает рилм func deleteAll() } extension PersistenceGatewayProtocol { - func get(mapper: M) -> AnySinglePublisher { + func get(mapper: M) -> AnyPublisher { get(mapper: mapper) { $0 } } - func getArray(mapper: M, filterBlock: @escaping GetResultBlock) -> AnySinglePublisher<[M.DomainModel], Error> { + func getArray(mapper: M, filterBlock: @escaping GetResultBlock) -> AnyPublisher<[M.DomainModel], Error> { getArray(mapper: mapper) { $0 } } @@ -116,15 +116,15 @@ extension PersistenceGatewayProtocol { listenArray(mapper: mapper, range: nil) { $0 } } - func save(object: M.Model, mapper: M, update: Realm.UpdatePolicy = .all) -> AnySinglePublisher { + func save(object: M.Model, mapper: M, update: Realm.UpdatePolicy = .all) -> AnyPublisher { save(object: object, mapper: mapper, update: update) } - func save(objects: [M.Model], mapper: M, update: Realm.UpdatePolicy = .all) -> AnySinglePublisher { + func save(objects: [M.Model], mapper: M, update: Realm.UpdatePolicy = .all) -> AnyPublisher { save(objects: objects, mapper: mapper, update: update) } - func count(_ type: T.Type) -> AnySinglePublisher { + func count(_ type: T.Type) -> AnyPublisher { count(type) { $0 } } } diff --git a/RealmTest/Realm/UserStorage.swift b/RealmTest/Realm/UserStorage.swift index 5437155..943b66f 100644 --- a/RealmTest/Realm/UserStorage.swift +++ b/RealmTest/Realm/UserStorage.swift @@ -25,15 +25,15 @@ final class UserStorage { gateway = PersistenceGateway(regularScheduler: queue.eraseToAnyScheduler(), configuration: config) } - func update(user: User) -> AnySinglePublisher { + func update(user: User) -> AnyPublisher { return gateway.save(object: user, mapper: UserMapper(), update: .modified) } - func save(user: User) -> AnySinglePublisher { + func save(user: User) -> AnyPublisher { return gateway.save(object: user, mapper: UserMapper(), update: .all) } - func saveToContainer(user: User) -> AnySinglePublisher { + func saveToContainer(user: User) -> AnyPublisher { let id = containerId return gateway.updateAction { realm in let objects = realm.objects(AppRealmUserContainer.self).filter("id = %@", id) @@ -46,7 +46,7 @@ final class UserStorage { } } - func saveToContainer(users: [User]) -> AnySinglePublisher { + func saveToContainer(users: [User]) -> AnyPublisher { let id = containerId return gateway.updateAction { realm in let objects = realm.objects(AppRealmUserContainer.self).filter("id = %@", id) @@ -60,7 +60,7 @@ final class UserStorage { } } - func updateInContainer(user: User) -> AnySinglePublisher { + func updateInContainer(user: User) -> AnyPublisher { let id = containerId return gateway.updateAction { realm in let objects = realm.objects(AppRealmUserContainer.self).filter("id = %@", id) @@ -81,7 +81,7 @@ final class UserStorage { } } - func deleteFromContainer(userAt userId: UUID) -> AnySinglePublisher { + func deleteFromContainer(userAt userId: UUID) -> AnyPublisher { let id = containerId return gateway.updateAction { realm in let objects = realm.objects(AppRealmUserContainer.self).filter("id = %@", id) @@ -104,21 +104,21 @@ final class UserStorage { ) { [containerId] in $0.filter("id = %@", containerId).first?.usersList } } - func saveContainer() -> AnySinglePublisher { + func saveContainer() -> AnyPublisher { let container = AppUserContainer(id: containerId, users: []) let mapper = AppDomainRealmUserContainerMapper(userMapper: UserMapper()) return gateway.count(AppDomainRealmUserContainerMapper.self) - .flatMap { [gateway] count -> AnySinglePublisher in + .flatMap { [gateway] count -> AnyPublisher in if count == 0 { return gateway.save(object: container, mapper: mapper, update: .all) } else { - return Just(()).setFailureType(to: Error.self).eraseToAnySinglePublisher() + return Just(()).setFailureType(to: Error.self).eraseToAnyPublisher() } } - .eraseToAnySinglePublisher() + .eraseToAnyPublisher() } - func getUser(id: String) -> AnySinglePublisher { + func getUser(id: String) -> AnyPublisher { return gateway.get(mapper: RealmUserMapper()) { $0.filter("id = %@", id) } } @@ -126,7 +126,7 @@ final class UserStorage { return gateway.listen(mapper: RealmUserMapper()) { $0.filter("id = %@", id) } } - func update(id: String) -> AnySinglePublisher { + func update(id: String) -> AnyPublisher { return gateway.updateAction { realm in let user = realm.object(ofType: RealmUser.self, forPrimaryKey: id)! user.name = "update block name" @@ -136,7 +136,7 @@ final class UserStorage { } } - func delete(id: UUID) -> AnySinglePublisher { + func delete(id: UUID) -> AnyPublisher { return gateway.delete(UserMapper.self) { $0.filter("id = %@", id.uuidString) } } diff --git a/RealmTest/ThreadPool/RunLoopThreadWorker.swift b/RealmTest/ThreadPool/RunLoopThreadWorker.swift new file mode 100644 index 0000000..31caf22 --- /dev/null +++ b/RealmTest/ThreadPool/RunLoopThreadWorker.swift @@ -0,0 +1,197 @@ +// +// RunLoopThreadWorker.swift +// RealmTest +// +// Created by Vladislav Sedinkin on 18.08.2021. +// + +import Foundation + +protocol ThreadWorker: AnyObject { + func stop() +} + +class MyThread: Thread { + public enum State: String { + case waiting = "isWaiting" + case executing = "isExecuting" + case finished = "isFinished" + case cancelled = "isCancelled" + } + + open var state: State = State.waiting { + willSet { + willChangeValue(forKey: State.executing.rawValue) + willChangeValue(forKey: State.finished.rawValue) + willChangeValue(forKey: State.cancelled.rawValue) + } + didSet { + switch self.state { + case .waiting: + assert(oldValue == .waiting, "Invalid change from \(oldValue) to \(self.state)") + case .executing: + assert( + oldValue == .waiting, + "Invalid change from \(oldValue) to \(self.state)" + ) + case .finished: + // assert(oldValue != .cancelled, "Invalid change from \(oldValue) to \(self.state)") + break + case .cancelled: + break + } + + didChangeValue(forKey: State.cancelled.rawValue) + didChangeValue(forKey: State.finished.rawValue) + didChangeValue(forKey: State.executing.rawValue) + } + } + + open override var isExecuting: Bool { + if self.state == .waiting { + return super.isExecuting + } else { + return self.state == .executing + } + } + + open override var isFinished: Bool { + if self.state == .waiting { + return super.isFinished + } else { + return self.state == .finished + } + } + + open override var isCancelled: Bool { + if self.state == .waiting { + return super.isCancelled + } else { + return self.state == .cancelled + } + } + + override func main() { + print("") + while isExecuting { + RunLoop.current.run( + mode: .default, + before: .distantFuture + ) + } +// state = .finished + print("thread stopped") + } + + override func cancel() { + state = .finished + super.cancel() +// CFRunLoopStop(CFRunLoopGetCurrent()) +// +// super.cancel() + } + + deinit { + print("deinit thread \(name)") + } +} + +final class RunLoopThreadWorker: NSObject { + private let name: String + private lazy var lock = NSLock() + + private var operationsCount: UInt = 0 + private var block: (() -> Void)? + private var _thread: Thread? + private var liveThread: Thread { + if let thread = _thread { + return thread + } else { + let thread = MyThread() +// let thread = MyThread { [weak self] in +// self?.performThreadWork() +// } + thread.name = "\(name)-\(UUID().uuidString)" + _thread = thread + return thread + } + } + var isCancelled: Bool { + return _thread?.isCancelled ?? false + } + + var isExecuting: Bool { + return _thread?.isExecuting ?? false + } + + init(name: String) { + self.name = name + + super.init() + } + + func start(_ block: @escaping () -> Void) { + lock.lock() + defer { lock.unlock() } + + self.block = block + operationsCount = max(1, operationsCount &+ 1) + print("*** Start. Operations count = \(operationsCount) on thread: \(Thread.current.name) ***") + if liveThread.isFinished || liveThread.isCancelled { + assertionFailure("Try to start finished or cancelled thread") + } + if !liveThread.isExecuting { + liveThread.start() + } + + perform( + #selector(runBlock), + on: liveThread, + with: nil, + waitUntilDone: false, + modes: [RunLoop.Mode.default.rawValue] + ) + } + + deinit { + print("") + } +} + +extension RunLoopThreadWorker: ThreadWorker { + func stop() { + lock.lock() + defer { lock.unlock() } + + operationsCount = max(0, operationsCount - 1) + print("*** Stop. Operations count = \(operationsCount) on thread: \(_thread?.name) ***") + if operationsCount == 0 { + print("DEBUG: WORK WILL CANCEL. Thread exists \(_thread != nil)") + _thread?.cancel() +// isRun = false + } + } +} + +private extension RunLoopThreadWorker { + @objc func runBlock() { + block?() + } + + @objc func performThreadWork() { +// while true { +// while !(_thread?.isCancelled ?? false) { +// RunLoop.current.run( +// mode: .default, +// before: .distantFuture +// ) +// } +// print("==== WORK IS CANCELLED ====") +// lock.lock() +//// pthread_mutex_lock(&mutex) +// MyThread.exit() +// _thread = nil +// lock.unlock() +// pthread_mutex_unlock(&mutex) + } +} diff --git a/RealmTest/ThreadPool/ThreadPool.swift b/RealmTest/ThreadPool/ThreadPool.swift new file mode 100644 index 0000000..52a357c --- /dev/null +++ b/RealmTest/ThreadPool/ThreadPool.swift @@ -0,0 +1,44 @@ +// +// ThreadPool.swift +// RealmTest +// +// Created by Vladislav Sedinkin on 18.08.2021. +// + +import Foundation + +final class ThreadPool { + static let shared: ThreadPool = .init() + + private let lock = NSLock() + private var workers = NSHashTable.weakObjects() + + private init() { + } + + func start(name: String, block: @escaping () -> Void) -> ThreadWorker { + lock.lock() + defer { + lock.unlock() + } +// workers.removeAll { $0.isCancelled } + workers.allObjects.filter { !$0.isExecuting }.forEach { + workers.remove($0) + } + + let worker: RunLoopThreadWorker + if let runningWorker = workers.allObjects.first(where: \.isExecuting) { + print("DEBUG: Use existing worker for '\(name)'") + worker = runningWorker + } else { + print("DEBUG: Create new worker for '\(name)'") + worker = RunLoopThreadWorker(name: name) +// workers.allObjects.append(worker) + workers.add(worker) + } + + worker.start(block) + + return worker + } +} diff --git a/RealmTestTests/Persistence/PersistenceGatewayListenChangesetTests.swift b/RealmTestTests/Persistence/PersistenceGatewayListenChangesetTests.swift index 92b2ff6..2c89d6c 100644 --- a/RealmTestTests/Persistence/PersistenceGatewayListenChangesetTests.swift +++ b/RealmTestTests/Persistence/PersistenceGatewayListenChangesetTests.swift @@ -21,7 +21,7 @@ final class PersistenceGatewayListenChangesetTests: XCTestCase { listenScheduler = RunLoop.test let config = Realm.Configuration(inMemoryIdentifier: "in memory listen changeset test realm \(UUID().uuidString)") - persistence = PersistenceGateway(regularScheduler: .immediate, listenScheduler: listenScheduler.eraseToAnyScheduler(), configuration: config) + persistence = PersistenceGateway(regularScheduler: .immediate, listenOn: .scheduler(listenScheduler.eraseToAnyScheduler()), configuration: config) } override func tearDown() { diff --git a/RealmTestTests/Persistence/PersistenceGatewayListenTests.swift b/RealmTestTests/Persistence/PersistenceGatewayListenTests.swift index d978caa..5551280 100644 --- a/RealmTestTests/Persistence/PersistenceGatewayListenTests.swift +++ b/RealmTestTests/Persistence/PersistenceGatewayListenTests.swift @@ -21,7 +21,7 @@ final class PersistenceGatewayListenTests: XCTestCase { listenScheduler = RunLoop.test let config = Realm.Configuration(inMemoryIdentifier: "in memory listen test realm \(UUID().uuidString)") - persistence = PersistenceGateway(regularScheduler: .immediate, listenScheduler: listenScheduler.eraseToAnyScheduler(), configuration: config) + persistence = PersistenceGateway(regularScheduler: .immediate, listenOn: .scheduler(listenScheduler.eraseToAnyScheduler()), configuration: config) } override func tearDown() {