Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions app/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ configurations.configureEach {
exclude(module = "commons-logging")
}

val canonicalVersionCode = 439
val canonicalVersionName = "1.31.2"
val canonicalVersionCode = 440
val canonicalVersionName = "1.31.3"

val postFixSize = 10
val abiPostFix = mapOf(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ class MessageSender @Inject constructor(
}

// One-on-One Chats & Closed Groups
private suspend fun sendToSnodeDestination(destination: Destination, message: Message, isSyncMessage: Boolean = false) = supervisorScope {
private suspend fun sendToSnodeDestination(destination: Destination, message: Message, isSyncMessage: Boolean = false) {
// Set the failure handler (need it here already for precondition failure handling)
fun handleFailure(error: Exception) {
handleFailedMessageSend(message, error, isSyncMessage)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package org.session.libsession.messaging.sending_receiving.pollers
import dagger.assisted.Assisted
import dagger.assisted.AssistedFactory
import dagger.assisted.AssistedInject
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.launch
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.async
import kotlinx.coroutines.supervisorScope
import kotlinx.coroutines.sync.Semaphore
import kotlinx.coroutines.sync.withPermit
Expand Down Expand Up @@ -111,14 +113,16 @@ class OpenGroupPoller @AssistedInject constructor(
caps = fetched.capabilities
}

val allTasks = mutableListOf<Pair<String, Deferred<Unit>>>()

for (room in rooms) {
val address = Address.Community(serverUrl = server, room = room)
val latestRoomPollInfo = communityDatabase.getRoomInfo(address)
val infoUpdates = latestRoomPollInfo?.details?.infoUpdates ?: 0
val lastMessageServerId = storage.getLastMessageServerID(room, server)

// Poll room info
launch {
allTasks += "polling room info" to async {
val roomInfo = communityApiExecutor.execute(
CommunityApiRequest(
serverBaseUrl = server,
Expand All @@ -137,7 +141,7 @@ class OpenGroupPoller @AssistedInject constructor(
}

// Poll room messages
launch {
allTasks += "polling room messages" to async {
val messages = communityApiExecutor.execute(
CommunityApiRequest(
serverBaseUrl = server,
Expand All @@ -158,7 +162,7 @@ class OpenGroupPoller @AssistedInject constructor(
// We'll only poll our index if we are accepting community requests
if (storage.isCheckingCommunityRequests()) {
// Poll inbox messages
launch {
allTasks += "polling inbox messages" to async {
val inboxMessages = communityApiExecutor.execute(
CommunityApiRequest(
serverBaseUrl = server,
Expand All @@ -175,7 +179,7 @@ class OpenGroupPoller @AssistedInject constructor(
}

// Poll outbox messages regardless because these are messages we sent
launch {
allTasks += "polling outbox messages" to async {
val outboxMessages = communityApiExecutor.execute(
CommunityApiRequest(
serverBaseUrl = server,
Expand All @@ -190,6 +194,27 @@ class OpenGroupPoller @AssistedInject constructor(
handleOutboxMessages(messages = outboxMessages)
}
}

/**
* Await on all tasks and gather the first exception with the rest errors suppressed.
*/
val accumulatedError = allTasks
.fold(null) { acc: Throwable?, (taskName, deferred) ->
val err = runCatching { deferred.await() }
.onFailure { if (it is CancellationException) throw it }
.exceptionOrNull()
?.let { RuntimeException("Error $taskName", it) }

if (err != null) {
acc?.apply { addSuppressed(err) } ?: err
} else {
acc
}
}

if (accumulatedError != null) {
throw accumulatedError
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package org.session.libsession.messaging.sending_receiving.pollers

import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.cancel
import kotlinx.coroutines.flow.SharingStarted
import kotlinx.coroutines.flow.StateFlow
Expand All @@ -12,8 +13,6 @@ import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onStart
import kotlinx.coroutines.flow.scan
import kotlinx.coroutines.flow.stateIn
import kotlinx.coroutines.joinAll
import kotlinx.coroutines.launch
import kotlinx.coroutines.supervisorScope
import kotlinx.coroutines.sync.Semaphore
import org.session.libsession.utilities.ConfigFactoryProtocol
Expand Down Expand Up @@ -98,17 +97,11 @@ class OpenGroupPollerManager @Inject constructor(
suspend fun pollAllOpenGroupsOnce() {
Log.d(TAG, "Polling all open groups once")
supervisorScope {
pollers.value.map { (server, handle) ->
handle.pollerScope.launch {
runCatching {
handle.poller.manualPollOnce()
}.onFailure {
if (it !is CancellationException) {
Log.e(TAG, "Error polling open group $server", it)
}
}
pollers.value.map { (_, handle) ->
async {
handle.poller.manualPollOnce()
}
}.joinAll()
}.awaitAll()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,8 @@ class Poller @AssistedInject constructor(
)
)
} catch (e: Exception) {
if (e is CancellationException) throw e

Log.e(logTag, "Error while extending TTL for hashes", e)
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.thoughtcrime.securesms.configs

import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.async
Expand Down Expand Up @@ -114,6 +115,8 @@ class ConfigUploader @Inject constructor(
pushUserConfigChangesIfNeeded()
}
} catch (e: Exception) {
if (e is CancellationException) throw e

Log.e(TAG, "Failed to push user configs", e)
}
}
Expand Down Expand Up @@ -142,6 +145,8 @@ class ConfigUploader @Inject constructor(
pushGroupConfigsChangesIfNeeded(groupId)
}
} catch (e: Exception) {
if (e is CancellationException) throw e

Log.e(TAG, "Failed to push group configs", e)
}
}
Expand Down