feat: finish implementing scheduling sync and pollers, destination based config sync job

This commit is contained in:
0x330a
2023-09-18 16:13:25 +10:00
parent 43e72550f9
commit ec02087c6b
11 changed files with 113 additions and 68 deletions

View File

@@ -2,15 +2,23 @@ package org.session.libsession.messaging.jobs
import network.loki.messenger.libsession_util.Config
import network.loki.messenger.libsession_util.ConfigBase
import network.loki.messenger.libsession_util.ConfigBase.Companion.protoKindFor
import network.loki.messenger.libsession_util.GroupKeysConfig
import network.loki.messenger.libsession_util.util.ConfigPush
import nl.komponents.kovenant.functional.bind
import org.session.libsession.messaging.MessagingModuleConfiguration
import org.session.libsession.messaging.messages.Destination
import org.session.libsession.messaging.messages.control.SharedConfigurationMessage
import org.session.libsession.messaging.sending_receiving.MessageSender
import org.session.libsession.messaging.utilities.Data
import org.session.libsession.snode.RawResponse
import org.session.libsession.snode.SnodeAPI
import org.session.libsession.snode.SnodeAPI.SnodeBatchRequestInfo
import org.session.libsession.snode.SnodeMessage
import org.session.libsession.utilities.ConfigFactoryProtocol
import org.session.libsignal.utilities.Base64
import org.session.libsignal.utilities.Log
import org.session.libsignal.utilities.SessionId
import java.util.concurrent.atomic.AtomicBoolean
class InvalidDestination: Exception("Trying to push configs somewhere other than our swarm or a closed group")
@@ -30,8 +38,65 @@ data class ConfigurationSyncJob(val destination: Destination): Job {
data class SyncInformation(val configs: List<ConfigMessageInformation>, val toDelete: List<String>)
private fun destinationConfigs(delegate: JobDelegate, configFactoryProtocol: ConfigFactoryProtocol): SyncInformation {
TODO()
private fun destinationConfigs(delegate: JobDelegate,
dispatcherName: String,
configFactoryProtocol: ConfigFactoryProtocol): SyncInformation {
val toDelete = mutableListOf<String>()
val configsRequiringPush = if (destination is Destination.ClosedGroup) {
val sentTimestamp = SnodeAPI.nowWithOffset
// destination is a closed group, get all configs requiring push here
val groupId = SessionId.from(destination.publicKey)
val signingKey = configFactoryProtocol.userGroups!!.getClosedGroup(destination.publicKey)!!.signingKey()
val keys = configFactoryProtocol.getGroupKeysConfig(groupId)!!
val info = configFactoryProtocol.getGroupInfoConfig(groupId)!!
val members = configFactoryProtocol.getGroupMemberConfig(groupId)!!
val requiringPush = listOf(keys, info, members).filter {
when (it) {
is GroupKeysConfig -> it.pendingConfig()?.isNotEmpty() == true
is ConfigBase -> it.needsPush()
else -> false
}
}
// free the objects that were created but won't be used after this point
// in case any of the configs don't need pushing, they won't be freed later
(listOf(keys,info,members) subtract requiringPush).forEach(Config::free)
requiringPush.map { config ->
val (push, seqNo, obsoleteHashes) = if (config is GroupKeysConfig) {
ConfigPush(config.pendingConfig()!!, 0, emptyList()) // should not be null from filter step previous
} else if (config is ConfigBase) {
config.push()
} else throw IllegalArgumentException("Got a non group keys or config base object for config sync")
toDelete += obsoleteHashes
val message = SnodeMessage(destination.publicKey, Base64.encodeBytes(push), SnodeMessage.CONFIG_TTL, sentTimestamp)
ConfigMessageInformation(SnodeAPI.buildAuthenticatedStoreBatchInfo(config.namespace(), message, signingKey), config, seqNo)
}
} else {
// assume our own user as check already takes place in `execute` for our own key if contact
configFactoryProtocol.getUserConfigs().filter { it.needsPush() }.map { config ->
val (bytes, seqNo, obsoleteHashes) = config.push()
toDelete += obsoleteHashes
val message = messageForConfig(config, bytes, seqNo)
?: throw NullPointerException("SnodeBatchRequest message was null, check group keys exists")
ConfigMessageInformation(message, config, seqNo)
}
}
return SyncInformation(configsRequiringPush, toDelete)
}
private fun messageForConfig(
config: ConfigBase,
bytes: ByteArray,
seqNo: Long
): SnodeBatchRequestInfo? {
val message = SharedConfigurationMessage(config.protoKindFor(), bytes, seqNo)
val snodeMessage = MessageSender.buildWrappedMessageToSnode(destination, message, true)
return SnodeAPI.buildAuthenticatedStoreBatchInfo(config.namespace(), snodeMessage)
}
override suspend fun execute(dispatcherName: String) {
@@ -39,26 +104,17 @@ data class ConfigurationSyncJob(val destination: Destination): Job {
val userPublicKey = storage.getUserPublicKey()
val delegate = delegate ?: return Log.e("ConfigurationSyncJob", "No Delegate")
if (destination is Destination.Contact && destination.publicKey != userPublicKey) {
return delegate.handleJobFailedPermanently(this, dispatcherName, InvalidContactDestination())
} else if (destination !is Destination.ClosedGroup) {
if (destination !is Destination.ClosedGroup && (destination !is Destination.Contact || destination.publicKey != userPublicKey)) {
return delegate.handleJobFailedPermanently(this, dispatcherName, InvalidDestination())
}
// configFactory singleton instance will come in handy for modifying hashes and fetching configs for namespace etc
val configFactory = MessagingModuleConfiguration.shared.configFactory
// **** start user ****
// get latest states, filter out configs that don't need push
val configsRequiringPush = configFactory.getUserConfigs().filter { config -> config.needsPush() }
// don't run anything if we don't need to push anything
if (configsRequiringPush.isEmpty()) return delegate.handleJobSucceeded(this, dispatcherName)
// **** end user ****
// allow null results here so the list index matches configsRequiringPush
val sentTimestamp: Long = SnodeAPI.nowWithOffset
val (batchObjects, toDeleteHashes) = destinationConfigs(delegate, configFactory)
val (batchObjects, toDeleteHashes) = destinationConfigs(delegate, dispatcherName, configFactory)
if (batchObjects.isEmpty()) return delegate.handleJobSucceeded(this, dispatcherName)
val toDeleteRequest = toDeleteHashes.let { toDeleteFromAllNamespaces ->
if (toDeleteFromAllNamespaces.isEmpty()) null
@@ -86,22 +142,6 @@ data class ConfigurationSyncJob(val destination: Destination): Job {
val rawResponses = batchResponse.get()
@Suppress("UNCHECKED_CAST")
val responseList = (rawResponses["results"] as List<RawResponse>)
// we are always adding in deletions at the end
val deletionResponse = if (toDeleteRequest != null && responseList.isNotEmpty()) responseList.last() else null
val deletedHashes = deletionResponse?.let {
@Suppress("UNCHECKED_CAST")
// get the sub-request body
(deletionResponse["body"] as? RawResponse)?.let { body ->
// get the swarm dict
body["swarm"] as? RawResponse
}?.mapValues { (_, swarmDict) ->
// get the deleted values from dict
((swarmDict as? RawResponse)?.get("deleted") as? List<String>)?.toSet() ?: emptySet()
}?.values?.reduce { acc, strings ->
// create an intersection of all deleted hashes (common between all swarm nodes)
acc intersect strings
}
} ?: emptySet()
// at this point responseList index should line up with configsRequiringPush index
batchObjects.forEachIndexed { index, (message, config, seqNo) ->
@@ -125,6 +165,10 @@ data class ConfigurationSyncJob(val destination: Destination): Job {
if (config is ConfigBase && config.needsDump()) { // usually this will be true?
configFactory.persist(config, (message.params["timestamp"] as String).toLong())
}
if (destination is Destination.ClosedGroup) {
config.free() // after they are used, free the temporary group configs
}
}
} catch (e: Exception) {
Log.e(TAG, "Error performing batch request", e)
@@ -137,10 +181,6 @@ data class ConfigurationSyncJob(val destination: Destination): Job {
}
}
private fun getUserSyncInformation(delegate: JobDelegate) {
val userEdKeyPair = MessagingModuleConfiguration.shared.getUserED25519KeyPair()
}
fun Destination.destinationPublicKey(): String = when (this) {
is Destination.Contact -> publicKey
is Destination.ClosedGroup -> publicKey

View File

@@ -14,6 +14,7 @@ import org.session.libsession.messaging.MessagingModuleConfiguration
import org.session.libsession.messaging.jobs.BatchMessageReceiveJob
import org.session.libsession.messaging.jobs.JobQueue
import org.session.libsession.messaging.jobs.MessageReceiveParameters
import org.session.libsession.messaging.messages.Destination
import org.session.libsession.snode.RawResponse
import org.session.libsession.snode.SnodeAPI
import org.session.libsession.utilities.ConfigFactoryProtocol
@@ -101,6 +102,8 @@ class ClosedGroupPoller(private val executor: CoroutineScope,
val membersIndex = 2
val messageIndex = 3
val requiresSync = info.needsPush() || members.needsPush() || keys.needsRekey() || keys.pendingConfig() != null
val messagePoll = SnodeAPI.buildAuthenticatedRetrieveBatchRequest(
snode,
closedGroupSessionId.hexString(),
@@ -154,6 +157,9 @@ class ClosedGroupPoller(private val executor: CoroutineScope,
info.free()
members.free()
if (requiresSync) {
configFactoryProtocol.scheduleUpdate(Destination.ClosedGroup(closedGroupSessionId.hexString()))
}
} catch (e: Exception) {
if (ENABLE_LOGGING) Log.e("GroupPoller", "Polling failed for group", e)
return POLL_INTERVAL

View File

@@ -30,4 +30,8 @@ data class SnodeMessage(
"timestamp" to timestamp.toString(),
)
}
companion object {
const val CONFIG_TTL: Long = 30 * 24 * 60 * 60 * 1000L
}
}

View File

@@ -9,6 +9,7 @@ import network.loki.messenger.libsession_util.GroupKeysConfig
import network.loki.messenger.libsession_util.GroupMembersConfig
import network.loki.messenger.libsession_util.UserGroupsConfig
import network.loki.messenger.libsession_util.UserProfile
import org.session.libsession.messaging.messages.Destination
import org.session.libsignal.utilities.SessionId
interface ConfigFactoryProtocol {
@@ -31,6 +32,8 @@ interface ConfigFactoryProtocol {
groupInfo: GroupInfoConfig,
groupMembers: GroupMembersConfig
)
fun scheduleUpdate(destination: Destination)
}
interface ConfigFactoryUpdateListener {