diff --git a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/ClosedGroupPoller.kt b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/ClosedGroupPoller.kt index be745f66dd..ae34349f97 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/ClosedGroupPoller.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/ClosedGroupPoller.kt @@ -2,8 +2,10 @@ package org.session.libsession.messaging.sending_receiving.pollers import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.Deferred import kotlinx.coroutines.Job +import kotlinx.coroutines.async +import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay import kotlinx.coroutines.isActive import kotlinx.coroutines.launch @@ -12,7 +14,6 @@ import network.loki.messenger.libsession_util.GroupKeysConfig import network.loki.messenger.libsession_util.GroupMembersConfig import network.loki.messenger.libsession_util.util.GroupInfo import network.loki.messenger.libsession_util.util.Sodium -import org.session.libsession.database.StorageProtocol import org.session.libsession.messaging.MessagingModuleConfiguration import org.session.libsession.messaging.groups.GroupManagerV2 import org.session.libsession.messaging.jobs.BatchMessageReceiveJob @@ -27,12 +28,12 @@ import org.session.libsession.snode.model.BatchResponse import org.session.libsession.snode.utilities.await import org.session.libsession.utilities.ConfigFactoryProtocol import org.session.libsession.utilities.withGroupConfigsOrNull -import org.session.libsignal.messages.SignalServiceGroup import org.session.libsignal.utilities.AccountId import org.session.libsignal.utilities.Base64 import org.session.libsignal.utilities.Log import org.session.libsignal.utilities.Namespace import org.session.libsignal.utilities.Snode +import kotlin.coroutines.cancellation.CancellationException import kotlin.time.Duration.Companion.days class ClosedGroupPoller( @@ -87,7 +88,7 @@ class ClosedGroupPoller( val nextPoll = runCatching { poll(group) } when { nextPoll.isFailure -> { - Log.e("ClosedGroupPoller", "Error polling closed group ${closedGroupSessionId.hexString}: ${nextPoll.exceptionOrNull()}") + Log.e("ClosedGroupPoller", "Error polling closed group", nextPoll.exceptionOrNull()) delay(POLL_INTERVAL) } @@ -110,7 +111,7 @@ class ClosedGroupPoller( job = null } - private suspend fun poll(group: GroupInfo.ClosedGroupInfo): Long? { + private suspend fun poll(group: GroupInfo.ClosedGroupInfo): Long? = coroutineScope { val snode = SnodeAPI.getSingleTargetSnode(closedGroupSessionId.hexString).await() configFactoryProtocol.withGroupConfigsOrNull(closedGroupSessionId) { info, members, keys -> @@ -122,72 +123,130 @@ class ClosedGroupPoller( val authData = group.authData val adminKey = group.adminKey + val groupAccountId = group.groupAccountId val auth = if (authData != null) { GroupSubAccountSwarmAuth( groupKeysConfig = keys, - accountId = group.groupAccountId, + accountId = groupAccountId, authData = authData ) } else if (adminKey != null) { OwnedSwarmAuth.ofClosedGroup( - groupAccountId = group.groupAccountId, + groupAccountId = groupAccountId, adminKey = adminKey ) } else { Log.e("ClosedGroupPoller", "No auth data for group, polling is cancelled") - return null + return@coroutineScope null } - val revokedPoll = SnodeAPI.buildAuthenticatedRetrieveBatchRequest( - snode = snode, - auth = auth, - namespace = Namespace.REVOKED_GROUP_MESSAGES(), - maxSize = null, - ) - val messagePoll = SnodeAPI.buildAuthenticatedRetrieveBatchRequest( - snode = snode, - auth = auth, - namespace = Namespace.CLOSED_GROUP_MESSAGES(), - maxSize = null, - ) - val infoPoll = SnodeAPI.buildAuthenticatedRetrieveBatchRequest( - snode = snode, - auth = auth, - namespace = info.namespace(), - maxSize = null, - ) - val membersPoll = SnodeAPI.buildAuthenticatedRetrieveBatchRequest( - snode = snode, - auth = auth, - namespace = members.namespace(), - maxSize = null, - ) - val keysPoll = SnodeAPI.buildAuthenticatedRetrieveBatchRequest( - snode = snode, - auth = auth, - namespace = keys.namespace(), - maxSize = null, - ) + val pollingTasks = mutableListOf>>() - val requests = mutableListOf(keysPoll, revokedPoll, infoPoll, membersPoll, messagePoll) - - if (hashesToExtend.isNotEmpty()) { - requests += SnodeAPI.buildAuthenticatedAlterTtlBatchRequest( - messageHashes = hashesToExtend.toList(), - auth = auth, - newExpiry = SnodeAPI.nowWithOffset + 14.days.inWholeMilliseconds, - extend = true + pollingTasks += "Poll revoked messages" to async { + handleRevoked( + body = SnodeAPI.sendBatchRequest( + groupAccountId, + SnodeAPI.buildAuthenticatedRetrieveBatchRequest( + snode = snode, + auth = auth, + namespace = Namespace.REVOKED_GROUP_MESSAGES(), + maxSize = null, + ), + Map::class.java), + keys = keys ) } - val pollResult = SnodeAPI.getRawBatchResponse( - snode = snode, - publicKey = closedGroupSessionId.hexString, - requests = requests - ).await() + pollingTasks += "Poll group messages" to async { + handleMessages( + body = SnodeAPI.sendBatchRequest( + groupAccountId, + SnodeAPI.buildAuthenticatedRetrieveBatchRequest( + snode = snode, + auth = auth, + namespace = Namespace.CLOSED_GROUP_MESSAGES(), + maxSize = null, + ), + Map::class.java), + snode = snode, + keysConfig = keys + ) + } + + pollingTasks += "Poll group keys config" to async { + handleKeyPoll( + response = SnodeAPI.sendBatchRequest( + groupAccountId, + SnodeAPI.buildAuthenticatedRetrieveBatchRequest( + snode = snode, + auth = auth, + namespace = keys.namespace(), + maxSize = null, + ), + Map::class.java), + keysConfig = keys, + infoConfig = info, + membersConfig = members + ) + } + + pollingTasks += "Poll group info config" to async { + handleInfo( + response = SnodeAPI.sendBatchRequest( + groupAccountId, + SnodeAPI.buildAuthenticatedRetrieveBatchRequest( + snode = snode, + auth = auth, + namespace = Namespace.CLOSED_GROUP_INFO(), + maxSize = null, + ), + Map::class.java), + infoConfig = info + ) + } + + pollingTasks += "Poll group members config" to async { + handleMembers( + response = SnodeAPI.sendBatchRequest( + groupAccountId, + SnodeAPI.buildAuthenticatedRetrieveBatchRequest( + snode = snode, + auth = auth, + namespace = Namespace.CLOSED_GROUP_MEMBERS(), + maxSize = null, + ), + Map::class.java), + membersConfig = members + ) + } + + if (hashesToExtend.isNotEmpty() && adminKey != null) { + pollingTasks += "Extend group config TTL" to async { + SnodeAPI.sendBatchRequest( + groupAccountId, + SnodeAPI.buildAuthenticatedAlterTtlBatchRequest( + messageHashes = hashesToExtend.toList(), + auth = auth, + newExpiry = SnodeAPI.nowWithOffset + 14.days.inWholeMilliseconds, + extend = true + ), + ) + } + } + + val errors = pollingTasks.mapNotNull { (name, task) -> + runCatching { task.await() } + .exceptionOrNull() + ?.takeIf { it !is CancellationException } + ?.let { RuntimeException("Error executing: $name", it) } + } + + if (errors.isNotEmpty()) { + throw PollerException("Error polling closed group", errors) + } // If we no longer have a group, stop poller - if (configFactoryProtocol.userGroups?.getClosedGroup(closedGroupSessionId.hexString) == null) return null + if (configFactoryProtocol.userGroups?.getClosedGroup(closedGroupSessionId.hexString) == null) return@coroutineScope null // if poll result body is null here we don't have any things ig if (ENABLE_LOGGING) Log.d( @@ -195,19 +254,6 @@ class ClosedGroupPoller( "Poll results @${SnodeAPI.nowWithOffset}:" ) - requests.asSequence() - .zip((pollResult["results"] as List).asSequence()) - .forEach { (request, response) -> - when (request) { - revokedPoll -> handleRevoked(response, keys) - keysPoll -> handleKeyPoll(response, keys, info, members) - infoPoll -> handleInfo(response, info) - membersPoll -> handleMembers(response, members) - messagePoll -> handleMessages(response, snode, keys) - else -> {} - } - } - val requiresSync = info.needsPush() || members.needsPush() || keys.needsRekey() || keys.pendingConfig() != null @@ -220,15 +266,10 @@ class ClosedGroupPoller( } } - return POLL_INTERVAL // this might change in future + POLL_INTERVAL // this might change in future } - private fun parseMessages(response: RawResponse): List { - val body = response["body"] as? RawResponse - if (body == null) { - if (ENABLE_LOGGING) Log.e("GroupPoller", "Batch parse messages contained no body!") - return emptyList() - } + private fun parseMessages(body: RawResponse): List { val messages = body["messages"] as? List<*> ?: return emptyList() return messages.mapNotNull { messageMap -> val rawMessageAsJSON = messageMap as? Map<*, *> ?: return@mapNotNull null @@ -240,14 +281,9 @@ class ClosedGroupPoller( } } - private fun handleRevoked(response: RawResponse, keys: GroupKeysConfig) { + private suspend fun handleRevoked(body: RawResponse, keys: GroupKeysConfig) { // This shouldn't ever return null at this point val userSessionId = configFactoryProtocol.userSessionId()!! - val body = response["body"] as? RawResponse - if (body == null) { - if (ENABLE_LOGGING) Log.e("GroupPoller", "No revoked messages") - return - } val messages = body["messages"] as? List<*> ?: return Log.w("GroupPoller", "body didn't contain a list of messages") messages.forEach { messageMap -> @@ -270,14 +306,11 @@ class ClosedGroupPoller( if (Sodium.KICKED_REGEX.matches(message)) { val (sessionId, generation) = message.split("-") if (sessionId == userSessionId.hexString && generation.toInt() >= keys.currentGeneration()) { - GlobalScope.launch { - try { - groupManagerV2.handleKicked(closedGroupSessionId) - } catch (e: Exception) { - Log.e("GroupPoller", "Error handling kicked message: $e") - } + try { + groupManagerV2.handleKicked(closedGroupSessionId) + } catch (e: Exception) { + Log.e("GroupPoller", "Error handling kicked message: $e") } - } } } @@ -323,8 +356,7 @@ class ClosedGroupPoller( } } - private fun handleMessages(response: RawResponse, snode: Snode, keysConfig: GroupKeysConfig) { - val body = response["body"] as RawResponse + private fun handleMessages(body: RawResponse, snode: Snode, keysConfig: GroupKeysConfig) { val messages = SnodeAPI.parseRawMessagesResponse( rawResponse = body, snode = snode, diff --git a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/PollerException.kt b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/PollerException.kt new file mode 100644 index 0000000000..327965463d --- /dev/null +++ b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/PollerException.kt @@ -0,0 +1,15 @@ +package org.session.libsession.messaging.sending_receiving.pollers + +/** + * Exception thrown by a Poller-family when multiple error could have occurred. + */ +class PollerException(message: String, errors: List) : RuntimeException( + message, + errors.firstOrNull() +) { + init { + errors.asSequence() + .drop(1) + .forEach(this::addSuppressed) + } +} diff --git a/libsession/src/main/java/org/session/libsession/snode/SnodeAPI.kt b/libsession/src/main/java/org/session/libsession/snode/SnodeAPI.kt index 46b4373a33..c26f7ebd02 100644 --- a/libsession/src/main/java/org/session/libsession/snode/SnodeAPI.kt +++ b/libsession/src/main/java/org/session/libsession/snode/SnodeAPI.kt @@ -13,16 +13,11 @@ import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.SupervisorJob -import kotlinx.coroutines.async import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.SendChannel -import kotlinx.coroutines.flow.MutableSharedFlow -import kotlinx.coroutines.flow.scan import kotlinx.coroutines.launch -import kotlinx.coroutines.selects.onTimeout import kotlinx.coroutines.selects.select import kotlinx.coroutines.withContext -import kotlinx.coroutines.withTimeoutOrNull import nl.komponents.kovenant.Promise import nl.komponents.kovenant.all import nl.komponents.kovenant.functional.bind @@ -55,8 +50,6 @@ import kotlin.collections.component1 import kotlin.collections.component2 import kotlin.collections.set import kotlin.properties.Delegates.observable -import kotlin.time.Duration -import kotlin.time.Duration.Companion.milliseconds object SnodeAPI { internal val database: LokiAPIDatabaseProtocol @@ -628,7 +621,7 @@ object SnodeAPI { } if (batch != null) { - launch { + launch batch@{ val accountId = batch.first().accountId val responses = try { getBatchResponse( @@ -640,13 +633,19 @@ object SnodeAPI { for (req in batch) { req.callback.send(Result.failure(e)) } - return@launch + return@batch } for ((req, resp) in batch.zip(responses.results)) { - req.callback.send(kotlin.runCatching { - JsonUtil.fromJson(resp.body, req.responseType) - }) + val result = if (resp.code != 200) { + Result.failure(RuntimeException("Error with code = ${resp.code}, msg = ${resp.body}")) + } else { + runCatching { + JsonUtil.fromJson(resp.body, req.responseType) + } + } + + req.callback.send(result) } // Close all channels in the requests just in case we don't have paired up @@ -745,6 +744,7 @@ object SnodeAPI { buildString { append("expire") append(shortenOrExtend) + append(newExpiry.toString()) messageHashes.forEach(this::append) } }