Group poller improvement

This commit is contained in:
SessionHero01 2024-09-25 14:30:48 +10:00
parent b13b7e647f
commit 7eb615f8dc
No known key found for this signature in database
3 changed files with 146 additions and 99 deletions

View File

@ -2,8 +2,10 @@ package org.session.libsession.messaging.sending_receiving.pollers
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.Job
import kotlinx.coroutines.async
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
@ -12,7 +14,6 @@ import network.loki.messenger.libsession_util.GroupKeysConfig
import network.loki.messenger.libsession_util.GroupMembersConfig
import network.loki.messenger.libsession_util.util.GroupInfo
import network.loki.messenger.libsession_util.util.Sodium
import org.session.libsession.database.StorageProtocol
import org.session.libsession.messaging.MessagingModuleConfiguration
import org.session.libsession.messaging.groups.GroupManagerV2
import org.session.libsession.messaging.jobs.BatchMessageReceiveJob
@ -27,12 +28,12 @@ import org.session.libsession.snode.model.BatchResponse
import org.session.libsession.snode.utilities.await
import org.session.libsession.utilities.ConfigFactoryProtocol
import org.session.libsession.utilities.withGroupConfigsOrNull
import org.session.libsignal.messages.SignalServiceGroup
import org.session.libsignal.utilities.AccountId
import org.session.libsignal.utilities.Base64
import org.session.libsignal.utilities.Log
import org.session.libsignal.utilities.Namespace
import org.session.libsignal.utilities.Snode
import kotlin.coroutines.cancellation.CancellationException
import kotlin.time.Duration.Companion.days
class ClosedGroupPoller(
@ -87,7 +88,7 @@ class ClosedGroupPoller(
val nextPoll = runCatching { poll(group) }
when {
nextPoll.isFailure -> {
Log.e("ClosedGroupPoller", "Error polling closed group ${closedGroupSessionId.hexString}: ${nextPoll.exceptionOrNull()}")
Log.e("ClosedGroupPoller", "Error polling closed group", nextPoll.exceptionOrNull())
delay(POLL_INTERVAL)
}
@ -110,7 +111,7 @@ class ClosedGroupPoller(
job = null
}
private suspend fun poll(group: GroupInfo.ClosedGroupInfo): Long? {
private suspend fun poll(group: GroupInfo.ClosedGroupInfo): Long? = coroutineScope {
val snode = SnodeAPI.getSingleTargetSnode(closedGroupSessionId.hexString).await()
configFactoryProtocol.withGroupConfigsOrNull(closedGroupSessionId) { info, members, keys ->
@ -122,72 +123,130 @@ class ClosedGroupPoller(
val authData = group.authData
val adminKey = group.adminKey
val groupAccountId = group.groupAccountId
val auth = if (authData != null) {
GroupSubAccountSwarmAuth(
groupKeysConfig = keys,
accountId = group.groupAccountId,
accountId = groupAccountId,
authData = authData
)
} else if (adminKey != null) {
OwnedSwarmAuth.ofClosedGroup(
groupAccountId = group.groupAccountId,
groupAccountId = groupAccountId,
adminKey = adminKey
)
} else {
Log.e("ClosedGroupPoller", "No auth data for group, polling is cancelled")
return null
return@coroutineScope null
}
val revokedPoll = SnodeAPI.buildAuthenticatedRetrieveBatchRequest(
snode = snode,
auth = auth,
namespace = Namespace.REVOKED_GROUP_MESSAGES(),
maxSize = null,
)
val messagePoll = SnodeAPI.buildAuthenticatedRetrieveBatchRequest(
snode = snode,
auth = auth,
namespace = Namespace.CLOSED_GROUP_MESSAGES(),
maxSize = null,
)
val infoPoll = SnodeAPI.buildAuthenticatedRetrieveBatchRequest(
snode = snode,
auth = auth,
namespace = info.namespace(),
maxSize = null,
)
val membersPoll = SnodeAPI.buildAuthenticatedRetrieveBatchRequest(
snode = snode,
auth = auth,
namespace = members.namespace(),
maxSize = null,
)
val keysPoll = SnodeAPI.buildAuthenticatedRetrieveBatchRequest(
snode = snode,
auth = auth,
namespace = keys.namespace(),
maxSize = null,
)
val pollingTasks = mutableListOf<Pair<String, Deferred<*>>>()
val requests = mutableListOf(keysPoll, revokedPoll, infoPoll, membersPoll, messagePoll)
if (hashesToExtend.isNotEmpty()) {
requests += SnodeAPI.buildAuthenticatedAlterTtlBatchRequest(
messageHashes = hashesToExtend.toList(),
auth = auth,
newExpiry = SnodeAPI.nowWithOffset + 14.days.inWholeMilliseconds,
extend = true
pollingTasks += "Poll revoked messages" to async {
handleRevoked(
body = SnodeAPI.sendBatchRequest(
groupAccountId,
SnodeAPI.buildAuthenticatedRetrieveBatchRequest(
snode = snode,
auth = auth,
namespace = Namespace.REVOKED_GROUP_MESSAGES(),
maxSize = null,
),
Map::class.java),
keys = keys
)
}
val pollResult = SnodeAPI.getRawBatchResponse(
snode = snode,
publicKey = closedGroupSessionId.hexString,
requests = requests
).await()
pollingTasks += "Poll group messages" to async {
handleMessages(
body = SnodeAPI.sendBatchRequest(
groupAccountId,
SnodeAPI.buildAuthenticatedRetrieveBatchRequest(
snode = snode,
auth = auth,
namespace = Namespace.CLOSED_GROUP_MESSAGES(),
maxSize = null,
),
Map::class.java),
snode = snode,
keysConfig = keys
)
}
pollingTasks += "Poll group keys config" to async {
handleKeyPoll(
response = SnodeAPI.sendBatchRequest(
groupAccountId,
SnodeAPI.buildAuthenticatedRetrieveBatchRequest(
snode = snode,
auth = auth,
namespace = keys.namespace(),
maxSize = null,
),
Map::class.java),
keysConfig = keys,
infoConfig = info,
membersConfig = members
)
}
pollingTasks += "Poll group info config" to async {
handleInfo(
response = SnodeAPI.sendBatchRequest(
groupAccountId,
SnodeAPI.buildAuthenticatedRetrieveBatchRequest(
snode = snode,
auth = auth,
namespace = Namespace.CLOSED_GROUP_INFO(),
maxSize = null,
),
Map::class.java),
infoConfig = info
)
}
pollingTasks += "Poll group members config" to async {
handleMembers(
response = SnodeAPI.sendBatchRequest(
groupAccountId,
SnodeAPI.buildAuthenticatedRetrieveBatchRequest(
snode = snode,
auth = auth,
namespace = Namespace.CLOSED_GROUP_MEMBERS(),
maxSize = null,
),
Map::class.java),
membersConfig = members
)
}
if (hashesToExtend.isNotEmpty() && adminKey != null) {
pollingTasks += "Extend group config TTL" to async {
SnodeAPI.sendBatchRequest(
groupAccountId,
SnodeAPI.buildAuthenticatedAlterTtlBatchRequest(
messageHashes = hashesToExtend.toList(),
auth = auth,
newExpiry = SnodeAPI.nowWithOffset + 14.days.inWholeMilliseconds,
extend = true
),
)
}
}
val errors = pollingTasks.mapNotNull { (name, task) ->
runCatching { task.await() }
.exceptionOrNull()
?.takeIf { it !is CancellationException }
?.let { RuntimeException("Error executing: $name", it) }
}
if (errors.isNotEmpty()) {
throw PollerException("Error polling closed group", errors)
}
// If we no longer have a group, stop poller
if (configFactoryProtocol.userGroups?.getClosedGroup(closedGroupSessionId.hexString) == null) return null
if (configFactoryProtocol.userGroups?.getClosedGroup(closedGroupSessionId.hexString) == null) return@coroutineScope null
// if poll result body is null here we don't have any things ig
if (ENABLE_LOGGING) Log.d(
@ -195,19 +254,6 @@ class ClosedGroupPoller(
"Poll results @${SnodeAPI.nowWithOffset}:"
)
requests.asSequence()
.zip((pollResult["results"] as List<RawResponse>).asSequence())
.forEach { (request, response) ->
when (request) {
revokedPoll -> handleRevoked(response, keys)
keysPoll -> handleKeyPoll(response, keys, info, members)
infoPoll -> handleInfo(response, info)
membersPoll -> handleMembers(response, members)
messagePoll -> handleMessages(response, snode, keys)
else -> {}
}
}
val requiresSync =
info.needsPush() || members.needsPush() || keys.needsRekey() || keys.pendingConfig() != null
@ -220,15 +266,10 @@ class ClosedGroupPoller(
}
}
return POLL_INTERVAL // this might change in future
POLL_INTERVAL // this might change in future
}
private fun parseMessages(response: RawResponse): List<ParsedRawMessage> {
val body = response["body"] as? RawResponse
if (body == null) {
if (ENABLE_LOGGING) Log.e("GroupPoller", "Batch parse messages contained no body!")
return emptyList()
}
private fun parseMessages(body: RawResponse): List<ParsedRawMessage> {
val messages = body["messages"] as? List<*> ?: return emptyList()
return messages.mapNotNull { messageMap ->
val rawMessageAsJSON = messageMap as? Map<*, *> ?: return@mapNotNull null
@ -240,14 +281,9 @@ class ClosedGroupPoller(
}
}
private fun handleRevoked(response: RawResponse, keys: GroupKeysConfig) {
private suspend fun handleRevoked(body: RawResponse, keys: GroupKeysConfig) {
// This shouldn't ever return null at this point
val userSessionId = configFactoryProtocol.userSessionId()!!
val body = response["body"] as? RawResponse
if (body == null) {
if (ENABLE_LOGGING) Log.e("GroupPoller", "No revoked messages")
return
}
val messages = body["messages"] as? List<*>
?: return Log.w("GroupPoller", "body didn't contain a list of messages")
messages.forEach { messageMap ->
@ -270,14 +306,11 @@ class ClosedGroupPoller(
if (Sodium.KICKED_REGEX.matches(message)) {
val (sessionId, generation) = message.split("-")
if (sessionId == userSessionId.hexString && generation.toInt() >= keys.currentGeneration()) {
GlobalScope.launch {
try {
groupManagerV2.handleKicked(closedGroupSessionId)
} catch (e: Exception) {
Log.e("GroupPoller", "Error handling kicked message: $e")
}
try {
groupManagerV2.handleKicked(closedGroupSessionId)
} catch (e: Exception) {
Log.e("GroupPoller", "Error handling kicked message: $e")
}
}
}
}
@ -323,8 +356,7 @@ class ClosedGroupPoller(
}
}
private fun handleMessages(response: RawResponse, snode: Snode, keysConfig: GroupKeysConfig) {
val body = response["body"] as RawResponse
private fun handleMessages(body: RawResponse, snode: Snode, keysConfig: GroupKeysConfig) {
val messages = SnodeAPI.parseRawMessagesResponse(
rawResponse = body,
snode = snode,

View File

@ -0,0 +1,15 @@
package org.session.libsession.messaging.sending_receiving.pollers
/**
* Exception thrown by a Poller-family when multiple error could have occurred.
*/
class PollerException(message: String, errors: List<Throwable>) : RuntimeException(
message,
errors.firstOrNull()
) {
init {
errors.asSequence()
.drop(1)
.forEach(this::addSuppressed)
}
}

View File

@ -13,16 +13,11 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.async
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.scan
import kotlinx.coroutines.launch
import kotlinx.coroutines.selects.onTimeout
import kotlinx.coroutines.selects.select
import kotlinx.coroutines.withContext
import kotlinx.coroutines.withTimeoutOrNull
import nl.komponents.kovenant.Promise
import nl.komponents.kovenant.all
import nl.komponents.kovenant.functional.bind
@ -55,8 +50,6 @@ import kotlin.collections.component1
import kotlin.collections.component2
import kotlin.collections.set
import kotlin.properties.Delegates.observable
import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds
object SnodeAPI {
internal val database: LokiAPIDatabaseProtocol
@ -628,7 +621,7 @@ object SnodeAPI {
}
if (batch != null) {
launch {
launch batch@{
val accountId = batch.first().accountId
val responses = try {
getBatchResponse(
@ -640,13 +633,19 @@ object SnodeAPI {
for (req in batch) {
req.callback.send(Result.failure(e))
}
return@launch
return@batch
}
for ((req, resp) in batch.zip(responses.results)) {
req.callback.send(kotlin.runCatching {
JsonUtil.fromJson(resp.body, req.responseType)
})
val result = if (resp.code != 200) {
Result.failure(RuntimeException("Error with code = ${resp.code}, msg = ${resp.body}"))
} else {
runCatching {
JsonUtil.fromJson(resp.body, req.responseType)
}
}
req.callback.send(result)
}
// Close all channels in the requests just in case we don't have paired up
@ -745,6 +744,7 @@ object SnodeAPI {
buildString {
append("expire")
append(shortenOrExtend)
append(newExpiry.toString())
messageHashes.forEach(this::append)
}
}