Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions app/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ configurations.configureEach {
exclude(module = "commons-logging")
}

val canonicalVersionCode = 439
val canonicalVersionName = "1.31.2"
val canonicalVersionCode = 442
val canonicalVersionName = "1.31.3"

val postFixSize = 10
val abiPostFix = mapOf(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ interface StorageProtocol {
fun getServerCapabilities(server: String): List<String>?
fun clearServerCapabilities(server: String)

// Open Groups
suspend fun addOpenGroup(urlAsString: String)
fun setOpenGroupServerMessageID(messageID: MessageId, serverID: Long, threadID: Long)

// Open Group Public Keys
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ class OfficialCommunityRepository @Inject constructor(
})
}
}
.shareIn(scope, SharingStarted.Lazily)
.shareIn(scope, SharingStarted.Lazily, replay = 1)

suspend fun fetchOfficialCommunities(): List<OpenGroupApi.DefaultGroup> {
if (officialCommunitiesCache.replayCache.firstOrNull()?.isFailure == true) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,24 @@ object OpenGroupApi {
@SerialName("default_upload")
val defaultUpload: Boolean = false,
val details: RoomInfoDetails = RoomInfoDetails()
)
) {
constructor(details: RoomInfoDetails): this(
token = details.token,
activeUsers = details.activeUsers,
admin = details.admin,
globalAdmin = details.globalAdmin,
moderator = details.moderator,
globalModerator = details.globalModerator,
read = details.read,
defaultRead = details.defaultRead,
defaultAccessible = details.defaultAccessible,
write = details.write,
defaultWrite = details.defaultWrite,
upload = details.upload,
defaultUpload = details.defaultUpload,
details = details
)
}

@JsonNaming(PropertyNamingStrategy.SnakeCaseStrategy::class)
@Serializable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,10 @@ abstract class CommunityApi<ResponseType: Any>(
super.handleErrorResponse(executorContext, baseUrl, response)
}

override fun debugInfo(): String {
return "${this.javaClass.simpleName} for room \"$room\""
}

class CommunityApiDependencies @Inject constructor(
val errorManager: ServerApiErrorManager,
val json: Json,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package org.session.libsession.messaging.open_groups.api

import dagger.assisted.Assisted
import dagger.assisted.AssistedFactory
import dagger.assisted.AssistedInject
import kotlinx.serialization.json.JsonObject
import kotlinx.serialization.json.decodeFromStream
import org.session.libsession.messaging.open_groups.OpenGroupApi
import org.thoughtcrime.securesms.api.ApiExecutorContext
import org.thoughtcrime.securesms.api.http.HttpResponse

class GetRoomDetailsApi @AssistedInject constructor(
deps: CommunityApiDependencies,
@Assisted override val room: String,
) : CommunityApi<OpenGroupApi.RoomInfoDetails>(deps) {
override val requiresSigning: Boolean
get() = false

override val httpMethod: String
get() = "GET"

override val httpEndpoint: String
get() = "/room/${room}"

override suspend fun handleSuccessResponse(
executorContext: ApiExecutorContext,
baseUrl: String,
response: HttpResponse
): OpenGroupApi.RoomInfoDetails {
@Suppress("OPT_IN_USAGE")
return response.body.asInputStream().use(json::decodeFromStream)
}

@AssistedFactory
interface Factory {
fun create(room: String): GetRoomDetailsApi
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ class MessageSender @Inject constructor(
}

// One-on-One Chats & Closed Groups
private suspend fun sendToSnodeDestination(destination: Destination, message: Message, isSyncMessage: Boolean = false) = supervisorScope {
private suspend fun sendToSnodeDestination(destination: Destination, message: Message, isSyncMessage: Boolean = false) {
// Set the failure handler (need it here already for precondition failure handling)
fun handleFailure(error: Exception) {
handleFailedMessageSend(message, error, isSyncMessage)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,28 @@ package org.session.libsession.messaging.sending_receiving.pollers

import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.async
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.combine
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.selects.selectUnbiased
import org.session.libsignal.utilities.Log
import org.thoughtcrime.securesms.util.AppVisibilityManager
import org.thoughtcrime.securesms.util.NetworkConnectivity
import kotlin.time.Clock
import kotlin.time.Duration.Companion.seconds
import kotlin.time.Instant
import kotlin.time.TimeMark
import kotlin.time.TimeSource

private typealias PollRequestCallback<T> = SendChannel<Result<T>>

/**
* Base class for pollers that perform periodic polling operations. These poller will:
Expand All @@ -30,11 +37,12 @@ import kotlin.time.Instant
*/
abstract class BasePoller<T>(
private val networkConnectivity: NetworkConnectivity,
appVisibilityManager: AppVisibilityManager,
private val appVisibilityManager: AppVisibilityManager,
private val scope: CoroutineScope,
) {
protected val logTag: String = this::class.java.simpleName
private val pollMutex = Mutex()

private val manualPollRequestSender: SendChannel<PollRequestCallback<T>>

private val mutablePollState = MutableStateFlow<PollState<T>>(PollState.Idle)

Expand All @@ -44,43 +52,77 @@ abstract class BasePoller<T>(
val pollState: StateFlow<PollState<T>> get() = mutablePollState

init {
val manualPollRequestChannel = Channel<PollRequestCallback<T>>()

manualPollRequestSender = manualPollRequestChannel

scope.launch {
var numConsecutiveFailures = 0
var nextRoutinePollAt: TimeMark? = null

while (true) {
// Wait until the app is in the foreground and we have network connectivity
combine(
appVisibilityManager.isAppVisible.filter { visible ->
if (visible) {
true
} else {
Log.d(logTag, "Polling paused - app in background")
false
}
},
networkConnectivity.networkAvailable.filter { hasNetwork ->
if (hasNetwork) {
true
} else {
Log.d(logTag, "Polling paused - no network connectivity")
false
}
},
transform = { _, _ -> }
).first()

try {
pollOnce("routine")
numConsecutiveFailures = 0
} catch (e: CancellationException) {
throw e
} catch (_: Throwable) {
numConsecutiveFailures += 1
val waitForRoutinePollDeferred = waitForRoutinePoll(nextRoutinePollAt)

val (pollReason, callback) = selectUnbiased {
manualPollRequestChannel.onReceive { callback ->
"manual" to callback
}

waitForRoutinePollDeferred.onAwait {
"routine" to null
}
}

// Clean up the deferred
waitForRoutinePollDeferred.cancel()

val result = runCatching {
pollOnce(pollReason)
}.onSuccess { numConsecutiveFailures = 0 }
.onFailure {
if (it is CancellationException) throw it
numConsecutiveFailures += 1
}

// Must use trySend as we shouldn't be waiting or responsible for
// the manual request (potential) ill-setup.
callback?.trySend(result)

val nextPollSeconds = nextPollDelaySeconds(numConsecutiveFailures)
Log.d(logTag, "Next poll in ${nextPollSeconds}s")
delay(nextPollSeconds * 1000L)
nextRoutinePollAt = TimeSource.Monotonic.markNow().plus(nextPollSeconds.seconds)
}
}
}

private fun waitForRoutinePoll(minStartAt: TimeMark?): Deferred<Unit> {
return scope.async {
combine(
appVisibilityManager.isAppVisible.filter { visible ->
if (visible) {
true
} else {
Log.d(logTag, "Polling paused - app in background")
false
}
},
networkConnectivity.networkAvailable.filter { hasNetwork ->
if (hasNetwork) {
true
} else {
Log.d(logTag, "Polling paused - no network connectivity")
false
}
},
{ _, _ -> }
).first()

// At this point, the criteria for routine poll are all satisfied.

// If we are told we can only start executing from a time, wait until that.
val delayMillis = minStartAt?.elapsedNow()?.let { -it.inWholeMilliseconds }
if (delayMillis != null && delayMillis > 0) {
Log.d(logTag, "Delay next poll for ${delayMillis}ms")
delay(delayMillis)
}
}
}
Expand All @@ -104,34 +146,32 @@ abstract class BasePoller<T>(
/**
* Performs a single polling operation. A failed poll should throw an exception.
*
* @param isFirstPollSinceApoStarted True if this is the first poll since the app started.
* @param isFirstPollSinceAppStarted True if this is the first poll since the app started.
* @return The result of the polling operation.
*/
protected abstract suspend fun doPollOnce(isFirstPollSinceApoStarted: Boolean): T
protected abstract suspend fun doPollOnce(isFirstPollSinceAppStarted: Boolean): T

private suspend fun pollOnce(reason: String): T {
pollMutex.withLock {
val lastState = mutablePollState.value
mutablePollState.value =
PollState.Polling(reason, lastPolledResult = lastState.lastPolledResult)
Log.d(logTag, "Start $reason polling")
val result = runCatching {
doPollOnce(isFirstPollSinceApoStarted = lastState is PollState.Idle)
}
val lastState = mutablePollState.value
mutablePollState.value =
PollState.Polling(reason, lastPolledResult = lastState.lastPolledResult)
Log.d(logTag, "Start $reason polling")
val result = runCatching {
doPollOnce(isFirstPollSinceAppStarted = lastState is PollState.Idle)
}

if (result.isSuccess) {
Log.d(logTag, "$reason polling succeeded")
} else if (result.exceptionOrNull() !is CancellationException) {
Log.e(logTag, "$reason polling failed", result.exceptionOrNull())
}
if (result.isSuccess) {
Log.d(logTag, "$reason polling succeeded")
} else if (result.exceptionOrNull() !is CancellationException) {
Log.e(logTag, "$reason polling failed", result.exceptionOrNull())
}

mutablePollState.value = PollState.Polled(
at = Clock.System.now(),
result = result,
)
mutablePollState.value = PollState.Polled(
at = Clock.System.now(),
result = result,
)

return result.getOrThrow()
}
return result.getOrThrow()
}

/**
Expand All @@ -143,15 +183,9 @@ abstract class BasePoller<T>(
* * This method will throw if the polling operation fails.
*/
suspend fun manualPollOnce(): T {
val resultChannel = Channel<Result<T>>()

scope.launch {
resultChannel.trySend(runCatching {
pollOnce("manual")
})
}

return resultChannel.receive().getOrThrow()
val callback = Channel<Result<T>>(capacity = 1)
manualPollRequestSender.send(callback)
return callback.receive().getOrThrow()
}


Expand Down
Loading
Loading