diff --git a/app/build.gradle.kts b/app/build.gradle.kts index 7efa7c14eb..580cd27ef9 100644 --- a/app/build.gradle.kts +++ b/app/build.gradle.kts @@ -25,8 +25,8 @@ configurations.configureEach { exclude(module = "commons-logging") } -val canonicalVersionCode = 439 -val canonicalVersionName = "1.31.2" +val canonicalVersionCode = 440 +val canonicalVersionName = "1.31.3" val postFixSize = 10 val abiPostFix = mapOf( diff --git a/app/src/main/java/org/session/libsession/messaging/sending_receiving/MessageSender.kt b/app/src/main/java/org/session/libsession/messaging/sending_receiving/MessageSender.kt index 2996f7ac34..0df067a081 100644 --- a/app/src/main/java/org/session/libsession/messaging/sending_receiving/MessageSender.kt +++ b/app/src/main/java/org/session/libsession/messaging/sending_receiving/MessageSender.kt @@ -245,7 +245,7 @@ class MessageSender @Inject constructor( } // One-on-One Chats & Closed Groups - private suspend fun sendToSnodeDestination(destination: Destination, message: Message, isSyncMessage: Boolean = false) = supervisorScope { + private suspend fun sendToSnodeDestination(destination: Destination, message: Message, isSyncMessage: Boolean = false) { // Set the failure handler (need it here already for precondition failure handling) fun handleFailure(error: Exception) { handleFailedMessageSend(message, error, isSyncMessage) diff --git a/app/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPoller.kt b/app/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPoller.kt index 0a8df17faf..ffb60f7845 100644 --- a/app/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPoller.kt +++ b/app/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPoller.kt @@ -3,8 +3,10 @@ package org.session.libsession.messaging.sending_receiving.pollers import dagger.assisted.Assisted import dagger.assisted.AssistedFactory import dagger.assisted.AssistedInject +import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.launch +import kotlinx.coroutines.Deferred +import kotlinx.coroutines.async import kotlinx.coroutines.supervisorScope import kotlinx.coroutines.sync.Semaphore import kotlinx.coroutines.sync.withPermit @@ -111,6 +113,8 @@ class OpenGroupPoller @AssistedInject constructor( caps = fetched.capabilities } + val allTasks = mutableListOf>>() + for (room in rooms) { val address = Address.Community(serverUrl = server, room = room) val latestRoomPollInfo = communityDatabase.getRoomInfo(address) @@ -118,7 +122,7 @@ class OpenGroupPoller @AssistedInject constructor( val lastMessageServerId = storage.getLastMessageServerID(room, server) // Poll room info - launch { + allTasks += "polling room info" to async { val roomInfo = communityApiExecutor.execute( CommunityApiRequest( serverBaseUrl = server, @@ -137,7 +141,7 @@ class OpenGroupPoller @AssistedInject constructor( } // Poll room messages - launch { + allTasks += "polling room messages" to async { val messages = communityApiExecutor.execute( CommunityApiRequest( serverBaseUrl = server, @@ -158,7 +162,7 @@ class OpenGroupPoller @AssistedInject constructor( // We'll only poll our index if we are accepting community requests if (storage.isCheckingCommunityRequests()) { // Poll inbox messages - launch { + allTasks += "polling inbox messages" to async { val inboxMessages = communityApiExecutor.execute( CommunityApiRequest( serverBaseUrl = server, @@ -175,7 +179,7 @@ class OpenGroupPoller @AssistedInject constructor( } // Poll outbox messages regardless because these are messages we sent - launch { + allTasks += "polling outbox messages" to async { val outboxMessages = communityApiExecutor.execute( CommunityApiRequest( serverBaseUrl = server, @@ -190,6 +194,27 @@ class OpenGroupPoller @AssistedInject constructor( handleOutboxMessages(messages = outboxMessages) } } + + /** + * Await on all tasks and gather the first exception with the rest errors suppressed. + */ + val accumulatedError = allTasks + .fold(null) { acc: Throwable?, (taskName, deferred) -> + val err = runCatching { deferred.await() } + .onFailure { if (it is CancellationException) throw it } + .exceptionOrNull() + ?.let { RuntimeException("Error $taskName", it) } + + if (err != null) { + acc?.apply { addSuppressed(err) } ?: err + } else { + acc + } + } + + if (accumulatedError != null) { + throw accumulatedError + } } } diff --git a/app/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPollerManager.kt b/app/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPollerManager.kt index ea33a6460d..47a978cf71 100644 --- a/app/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPollerManager.kt +++ b/app/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPollerManager.kt @@ -1,9 +1,10 @@ package org.session.libsession.messaging.sending_receiving.pollers -import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.async +import kotlinx.coroutines.awaitAll import kotlinx.coroutines.cancel import kotlinx.coroutines.flow.SharingStarted import kotlinx.coroutines.flow.StateFlow @@ -12,8 +13,6 @@ import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.onStart import kotlinx.coroutines.flow.scan import kotlinx.coroutines.flow.stateIn -import kotlinx.coroutines.joinAll -import kotlinx.coroutines.launch import kotlinx.coroutines.supervisorScope import kotlinx.coroutines.sync.Semaphore import org.session.libsession.utilities.ConfigFactoryProtocol @@ -98,17 +97,11 @@ class OpenGroupPollerManager @Inject constructor( suspend fun pollAllOpenGroupsOnce() { Log.d(TAG, "Polling all open groups once") supervisorScope { - pollers.value.map { (server, handle) -> - handle.pollerScope.launch { - runCatching { - handle.poller.manualPollOnce() - }.onFailure { - if (it !is CancellationException) { - Log.e(TAG, "Error polling open group $server", it) - } - } + pollers.value.map { (_, handle) -> + async { + handle.poller.manualPollOnce() } - }.joinAll() + }.awaitAll() } } diff --git a/app/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/Poller.kt b/app/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/Poller.kt index 2160296fc3..60b88472a8 100644 --- a/app/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/Poller.kt +++ b/app/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/Poller.kt @@ -272,6 +272,8 @@ class Poller @AssistedInject constructor( ) ) } catch (e: Exception) { + if (e is CancellationException) throw e + Log.e(logTag, "Error while extending TTL for hashes", e) } } diff --git a/app/src/main/java/org/thoughtcrime/securesms/configs/ConfigUploader.kt b/app/src/main/java/org/thoughtcrime/securesms/configs/ConfigUploader.kt index 4a051ee671..a16d4a5f5b 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/configs/ConfigUploader.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/configs/ConfigUploader.kt @@ -1,5 +1,6 @@ package org.thoughtcrime.securesms.configs +import kotlinx.coroutines.CancellationException import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.FlowPreview import kotlinx.coroutines.async @@ -114,6 +115,8 @@ class ConfigUploader @Inject constructor( pushUserConfigChangesIfNeeded() } } catch (e: Exception) { + if (e is CancellationException) throw e + Log.e(TAG, "Failed to push user configs", e) } } @@ -142,6 +145,8 @@ class ConfigUploader @Inject constructor( pushGroupConfigsChangesIfNeeded(groupId) } } catch (e: Exception) { + if (e is CancellationException) throw e + Log.e(TAG, "Failed to push group configs", e) } }