From f7e00c047fb08cab3eff08ee69dee986cac740ac Mon Sep 17 00:00:00 2001 From: SessionHero01 <180888785+SessionHero01@users.noreply.github.com> Date: Fri, 20 Feb 2026 13:32:15 +1100 Subject: [PATCH 1/5] Fix various incorrect coroutine usage --- .../sending_receiving/MessageSender.kt | 2 +- .../pollers/OpenGroupPoller.kt | 100 +++++++++++------- .../pollers/OpenGroupPollerManager.kt | 16 ++- .../sending_receiving/pollers/Poller.kt | 3 + .../securesms/configs/ConfigUploader.kt | 5 + 5 files changed, 76 insertions(+), 50 deletions(-) diff --git a/app/src/main/java/org/session/libsession/messaging/sending_receiving/MessageSender.kt b/app/src/main/java/org/session/libsession/messaging/sending_receiving/MessageSender.kt index 2777700af3..e98e37e530 100644 --- a/app/src/main/java/org/session/libsession/messaging/sending_receiving/MessageSender.kt +++ b/app/src/main/java/org/session/libsession/messaging/sending_receiving/MessageSender.kt @@ -244,7 +244,7 @@ class MessageSender @Inject constructor( } // One-on-One Chats & Closed Groups - private suspend fun sendToSnodeDestination(destination: Destination, message: Message, isSyncMessage: Boolean = false) = supervisorScope { + private suspend fun sendToSnodeDestination(destination: Destination, message: Message, isSyncMessage: Boolean = false) { // Set the failure handler (need it here already for precondition failure handling) fun handleFailure(error: Exception) { handleFailedMessageSend(message, error, isSyncMessage) diff --git a/app/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPoller.kt b/app/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPoller.kt index 0a8df17faf..ef18fe8e49 100644 --- a/app/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPoller.kt +++ b/app/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPoller.kt @@ -3,6 +3,7 @@ package org.session.libsession.messaging.sending_receiving.pollers import dagger.assisted.Assisted import dagger.assisted.AssistedFactory import dagger.assisted.AssistedInject +import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.launch import kotlinx.coroutines.supervisorScope @@ -119,37 +120,48 @@ class OpenGroupPoller @AssistedInject constructor( // Poll room info launch { - val roomInfo = communityApiExecutor.execute( - CommunityApiRequest( - serverBaseUrl = server, - serverPubKey = serverKey, - api = pollRoomInfoFactory.create( - room = room, - infoUpdates = infoUpdates + try { + val roomInfo = communityApiExecutor.execute( + CommunityApiRequest( + serverBaseUrl = server, + serverPubKey = serverKey, + api = pollRoomInfoFactory.create( + room = room, + infoUpdates = infoUpdates + ) ) ) - ) - handleRoomPollInfo( - address = address, - pollInfoJsonText = json.encodeToString(roomInfo) - ) + handleRoomPollInfo( + address = address, + pollInfoJsonText = json.encodeToString(roomInfo) + ) + } catch (e: Throwable) { + if (e is CancellationException) throw e + Log.e(logTag, "Error polling room info") + } } // Poll room messages launch { - val messages = communityApiExecutor.execute( - CommunityApiRequest( - serverBaseUrl = server, - serverPubKey = serverKey, - api = getRoomMessagesFactory.create( - room = room, - sinceSeqNo = lastMessageServerId, + try { + val messages = communityApiExecutor.execute( + CommunityApiRequest( + serverBaseUrl = server, + serverPubKey = serverKey, + api = getRoomMessagesFactory.create( + room = room, + sinceSeqNo = lastMessageServerId, + ) ) ) - ) - handleMessages(roomToken = room, messages = messages) + handleMessages(roomToken = room, messages = messages) + } catch (e: Throwable) { + if (e is CancellationException) throw e + + Log.e(logTag, "Error polling room messages") + } } } @@ -159,35 +171,45 @@ class OpenGroupPoller @AssistedInject constructor( if (storage.isCheckingCommunityRequests()) { // Poll inbox messages launch { - val inboxMessages = communityApiExecutor.execute( - CommunityApiRequest( - serverBaseUrl = server, - serverPubKey = serverKey, - api = getDirectMessageFactory.create( - inboxOrOutbox = true, - sinceLastId = storage.getLastInboxMessageId(server), + try { + val inboxMessages = communityApiExecutor.execute( + CommunityApiRequest( + serverBaseUrl = server, + serverPubKey = serverKey, + api = getDirectMessageFactory.create( + inboxOrOutbox = true, + sinceLastId = storage.getLastInboxMessageId(server), + ) ) ) - ) - handleInboxMessages(messages = inboxMessages) + handleInboxMessages(messages = inboxMessages) + } catch (e: Throwable) { + if (e is CancellationException) throw e + Log.e(logTag, "Error polling inbox messages") + } } } // Poll outbox messages regardless because these are messages we sent launch { - val outboxMessages = communityApiExecutor.execute( - CommunityApiRequest( - serverBaseUrl = server, - serverPubKey = serverKey, - api = getDirectMessageFactory.create( - inboxOrOutbox = false, - sinceLastId = storage.getLastOutboxMessageId(server), + try { + val outboxMessages = communityApiExecutor.execute( + CommunityApiRequest( + serverBaseUrl = server, + serverPubKey = serverKey, + api = getDirectMessageFactory.create( + inboxOrOutbox = false, + sinceLastId = storage.getLastOutboxMessageId(server), + ) ) ) - ) - handleOutboxMessages(messages = outboxMessages) + handleOutboxMessages(messages = outboxMessages) + } catch (e: Throwable) { + if (e is CancellationException) throw e + Log.e(logTag, "Error polling outbox messages") + } } } } diff --git a/app/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPollerManager.kt b/app/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPollerManager.kt index ea33a6460d..cc9384b7b3 100644 --- a/app/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPollerManager.kt +++ b/app/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPollerManager.kt @@ -4,6 +4,8 @@ import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.async +import kotlinx.coroutines.awaitAll import kotlinx.coroutines.cancel import kotlinx.coroutines.flow.SharingStarted import kotlinx.coroutines.flow.StateFlow @@ -98,17 +100,11 @@ class OpenGroupPollerManager @Inject constructor( suspend fun pollAllOpenGroupsOnce() { Log.d(TAG, "Polling all open groups once") supervisorScope { - pollers.value.map { (server, handle) -> - handle.pollerScope.launch { - runCatching { - handle.poller.manualPollOnce() - }.onFailure { - if (it !is CancellationException) { - Log.e(TAG, "Error polling open group $server", it) - } - } + pollers.value.map { (_, handle) -> + async { + handle.poller.manualPollOnce() } - }.joinAll() + }.awaitAll() } } diff --git a/app/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/Poller.kt b/app/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/Poller.kt index 2decc3aca3..69b8e7bcb0 100644 --- a/app/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/Poller.kt +++ b/app/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/Poller.kt @@ -3,6 +3,7 @@ package org.session.libsession.messaging.sending_receiving.pollers import dagger.assisted.Assisted import dagger.assisted.AssistedFactory import dagger.assisted.AssistedInject +import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.async import kotlinx.coroutines.launch @@ -267,6 +268,8 @@ class Poller @AssistedInject constructor( ) ) } catch (e: Exception) { + if (e is CancellationException) throw e + Log.e(logTag, "Error while extending TTL for hashes", e) } } diff --git a/app/src/main/java/org/thoughtcrime/securesms/configs/ConfigUploader.kt b/app/src/main/java/org/thoughtcrime/securesms/configs/ConfigUploader.kt index 4a051ee671..a16d4a5f5b 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/configs/ConfigUploader.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/configs/ConfigUploader.kt @@ -1,5 +1,6 @@ package org.thoughtcrime.securesms.configs +import kotlinx.coroutines.CancellationException import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.FlowPreview import kotlinx.coroutines.async @@ -114,6 +115,8 @@ class ConfigUploader @Inject constructor( pushUserConfigChangesIfNeeded() } } catch (e: Exception) { + if (e is CancellationException) throw e + Log.e(TAG, "Failed to push user configs", e) } } @@ -142,6 +145,8 @@ class ConfigUploader @Inject constructor( pushGroupConfigsChangesIfNeeded(groupId) } } catch (e: Exception) { + if (e is CancellationException) throw e + Log.e(TAG, "Failed to push group configs", e) } } From 4598bf125ae67d9db0909a0e81a1106733a77d7f Mon Sep 17 00:00:00 2001 From: SessionHero01 <180888785+SessionHero01@users.noreply.github.com> Date: Fri, 20 Feb 2026 13:41:51 +1100 Subject: [PATCH 2/5] Missing throwable in the log --- .../sending_receiving/pollers/OpenGroupPoller.kt | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/app/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPoller.kt b/app/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPoller.kt index ef18fe8e49..38c89576db 100644 --- a/app/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPoller.kt +++ b/app/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPoller.kt @@ -138,7 +138,7 @@ class OpenGroupPoller @AssistedInject constructor( ) } catch (e: Throwable) { if (e is CancellationException) throw e - Log.e(logTag, "Error polling room info") + Log.e(logTag, "Error polling room info", e) } } @@ -160,7 +160,7 @@ class OpenGroupPoller @AssistedInject constructor( } catch (e: Throwable) { if (e is CancellationException) throw e - Log.e(logTag, "Error polling room messages") + Log.e(logTag, "Error polling room messages", e) } } } @@ -186,7 +186,7 @@ class OpenGroupPoller @AssistedInject constructor( handleInboxMessages(messages = inboxMessages) } catch (e: Throwable) { if (e is CancellationException) throw e - Log.e(logTag, "Error polling inbox messages") + Log.e(logTag, "Error polling inbox messages", e) } } } @@ -208,7 +208,7 @@ class OpenGroupPoller @AssistedInject constructor( handleOutboxMessages(messages = outboxMessages) } catch (e: Throwable) { if (e is CancellationException) throw e - Log.e(logTag, "Error polling outbox messages") + Log.e(logTag, "Error polling outbox messages", e) } } } From 51a2d9fb3b747e019883e2ed1076ef237ef4e2bd Mon Sep 17 00:00:00 2001 From: SessionHero01 <180888785+SessionHero01@users.noreply.github.com> Date: Fri, 20 Feb 2026 14:08:11 +1100 Subject: [PATCH 3/5] Fine tune error handling for community poller --- .../pollers/OpenGroupPoller.kt | 106 ++++++++++-------- 1 file changed, 58 insertions(+), 48 deletions(-) diff --git a/app/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPoller.kt b/app/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPoller.kt index 38c89576db..90ad1895bb 100644 --- a/app/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPoller.kt +++ b/app/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPoller.kt @@ -5,6 +5,8 @@ import dagger.assisted.AssistedFactory import dagger.assisted.AssistedInject import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Deferred +import kotlinx.coroutines.async import kotlinx.coroutines.launch import kotlinx.coroutines.supervisorScope import kotlinx.coroutines.sync.Semaphore @@ -112,6 +114,8 @@ class OpenGroupPoller @AssistedInject constructor( caps = fetched.capabilities } + val allTasks = mutableListOf>>() + for (room in rooms) { val address = Address.Community(serverUrl = server, room = room) val latestRoomPollInfo = communityDatabase.getRoomInfo(address) @@ -119,31 +123,26 @@ class OpenGroupPoller @AssistedInject constructor( val lastMessageServerId = storage.getLastMessageServerID(room, server) // Poll room info - launch { - try { - val roomInfo = communityApiExecutor.execute( - CommunityApiRequest( - serverBaseUrl = server, - serverPubKey = serverKey, - api = pollRoomInfoFactory.create( - room = room, - infoUpdates = infoUpdates - ) + allTasks += "polling room info" to async { + val roomInfo = communityApiExecutor.execute( + CommunityApiRequest( + serverBaseUrl = server, + serverPubKey = serverKey, + api = pollRoomInfoFactory.create( + room = room, + infoUpdates = infoUpdates ) ) + ) - handleRoomPollInfo( - address = address, - pollInfoJsonText = json.encodeToString(roomInfo) - ) - } catch (e: Throwable) { - if (e is CancellationException) throw e - Log.e(logTag, "Error polling room info", e) - } + handleRoomPollInfo( + address = address, + pollInfoJsonText = json.encodeToString(roomInfo) + ) } // Poll room messages - launch { + allTasks += "polling room messages" to async { try { val messages = communityApiExecutor.execute( CommunityApiRequest( @@ -170,47 +169,58 @@ class OpenGroupPoller @AssistedInject constructor( // We'll only poll our index if we are accepting community requests if (storage.isCheckingCommunityRequests()) { // Poll inbox messages - launch { - try { - val inboxMessages = communityApiExecutor.execute( - CommunityApiRequest( - serverBaseUrl = server, - serverPubKey = serverKey, - api = getDirectMessageFactory.create( - inboxOrOutbox = true, - sinceLastId = storage.getLastInboxMessageId(server), - ) + allTasks += "polling inbox messages" to async { + val inboxMessages = communityApiExecutor.execute( + CommunityApiRequest( + serverBaseUrl = server, + serverPubKey = serverKey, + api = getDirectMessageFactory.create( + inboxOrOutbox = true, + sinceLastId = storage.getLastInboxMessageId(server), ) ) + ) - handleInboxMessages(messages = inboxMessages) - } catch (e: Throwable) { - if (e is CancellationException) throw e - Log.e(logTag, "Error polling inbox messages", e) - } + handleInboxMessages(messages = inboxMessages) } } // Poll outbox messages regardless because these are messages we sent - launch { - try { - val outboxMessages = communityApiExecutor.execute( - CommunityApiRequest( - serverBaseUrl = server, - serverPubKey = serverKey, - api = getDirectMessageFactory.create( - inboxOrOutbox = false, - sinceLastId = storage.getLastOutboxMessageId(server), - ) + allTasks += "polling outbox messages" to async { + val outboxMessages = communityApiExecutor.execute( + CommunityApiRequest( + serverBaseUrl = server, + serverPubKey = serverKey, + api = getDirectMessageFactory.create( + inboxOrOutbox = false, + sinceLastId = storage.getLastOutboxMessageId(server), ) ) + ) - handleOutboxMessages(messages = outboxMessages) - } catch (e: Throwable) { - if (e is CancellationException) throw e - Log.e(logTag, "Error polling outbox messages", e) + handleOutboxMessages(messages = outboxMessages) + } + } + + /** + * Await on all tasks and gather the first exception with the rest errors suppressed. + */ + val accumulatedError = allTasks + .fold(null) { acc: Throwable?, (taskName, deferred) -> + val err = runCatching { deferred.await() } + .onFailure { if (it is CancellationException) throw it } + .exceptionOrNull() + ?.let { RuntimeException("Error $taskName", it) } + + if (err != null) { + acc?.apply { addSuppressed(err) } ?: err + } else { + acc } } + + if (accumulatedError != null) { + throw accumulatedError } } } From fbc0300b0e98a66eeceb68b6a077d77479386d4d Mon Sep 17 00:00:00 2001 From: SessionHero01 <180888785+SessionHero01@users.noreply.github.com> Date: Fri, 20 Feb 2026 14:10:25 +1100 Subject: [PATCH 4/5] Remove unnecessary try-catch --- .../pollers/OpenGroupPoller.kt | 25 +++++++------------ 1 file changed, 9 insertions(+), 16 deletions(-) diff --git a/app/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPoller.kt b/app/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPoller.kt index 90ad1895bb..ffb60f7845 100644 --- a/app/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPoller.kt +++ b/app/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPoller.kt @@ -7,7 +7,6 @@ import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Deferred import kotlinx.coroutines.async -import kotlinx.coroutines.launch import kotlinx.coroutines.supervisorScope import kotlinx.coroutines.sync.Semaphore import kotlinx.coroutines.sync.withPermit @@ -143,24 +142,18 @@ class OpenGroupPoller @AssistedInject constructor( // Poll room messages allTasks += "polling room messages" to async { - try { - val messages = communityApiExecutor.execute( - CommunityApiRequest( - serverBaseUrl = server, - serverPubKey = serverKey, - api = getRoomMessagesFactory.create( - room = room, - sinceSeqNo = lastMessageServerId, - ) + val messages = communityApiExecutor.execute( + CommunityApiRequest( + serverBaseUrl = server, + serverPubKey = serverKey, + api = getRoomMessagesFactory.create( + room = room, + sinceSeqNo = lastMessageServerId, ) ) + ) - handleMessages(roomToken = room, messages = messages) - } catch (e: Throwable) { - if (e is CancellationException) throw e - - Log.e(logTag, "Error polling room messages", e) - } + handleMessages(roomToken = room, messages = messages) } } From 0334e76f43f876e454318cfedef882562860da3b Mon Sep 17 00:00:00 2001 From: SessionHero01 <180888785+SessionHero01@users.noreply.github.com> Date: Fri, 20 Feb 2026 14:20:43 +1100 Subject: [PATCH 5/5] Tidy up import --- .../sending_receiving/pollers/OpenGroupPollerManager.kt | 3 --- 1 file changed, 3 deletions(-) diff --git a/app/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPollerManager.kt b/app/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPollerManager.kt index cc9384b7b3..47a978cf71 100644 --- a/app/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPollerManager.kt +++ b/app/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPollerManager.kt @@ -1,6 +1,5 @@ package org.session.libsession.messaging.sending_receiving.pollers -import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.SupervisorJob @@ -14,8 +13,6 @@ import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.onStart import kotlinx.coroutines.flow.scan import kotlinx.coroutines.flow.stateIn -import kotlinx.coroutines.joinAll -import kotlinx.coroutines.launch import kotlinx.coroutines.supervisorScope import kotlinx.coroutines.sync.Semaphore import org.session.libsession.utilities.ConfigFactoryProtocol