Config revamp WIP

This commit is contained in:
SessionHero01
2024-09-26 15:43:45 +10:00
parent 7eb615f8dc
commit 771d63e902
58 changed files with 1831 additions and 2532 deletions

View File

@@ -68,7 +68,6 @@ interface StorageProtocol {
fun getMessageSendJob(messageSendJobID: String): MessageSendJob?
fun getMessageReceiveJob(messageReceiveJobID: String): Job?
fun getGroupAvatarDownloadJob(server: String, room: String, imageId: String?): Job?
fun getConfigSyncJob(destination: Destination): Job?
fun resumeMessageSendJobIfNeeded(messageSendJobID: String)
fun isJobCanceled(job: Job): Boolean
fun cancelPendingMessageSendJobs(threadID: Long)
@@ -269,7 +268,6 @@ interface StorageProtocol {
)
// Shared configs
fun notifyConfigUpdates(forConfigObject: Config, messageTimestamp: Long)
fun conversationInConfig(publicKey: String?, groupPublicKey: String?, openGroupId: String?, visibleOnly: Boolean): Boolean
fun canPerformConfigChange(variant: String, publicKey: String, changeTimestampMs: Long): Boolean
fun isCheckingCommunityRequests(): Boolean

View File

@@ -0,0 +1,133 @@
package org.session.libsession.messaging.configs
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.launch
import network.loki.messenger.libsession_util.MutableConfig
import network.loki.messenger.libsession_util.util.ConfigPush
import org.session.libsession.database.StorageProtocol
import org.session.libsession.database.userAuth
import org.session.libsession.snode.SnodeAPI
import org.session.libsession.snode.SnodeMessage
import org.session.libsession.snode.utilities.await
import org.session.libsession.utilities.ConfigFactoryProtocol
import org.session.libsession.utilities.ConfigUpdateNotification
import org.session.libsignal.utilities.AccountId
import org.session.libsignal.utilities.Base64
import org.session.libsignal.utilities.Log
class ConfigSyncHandler(
private val configFactory: ConfigFactoryProtocol,
private val storageProtocol: StorageProtocol,
@Suppress("OPT_IN_USAGE") scope: CoroutineScope = GlobalScope,
) {
init {
scope.launch {
configFactory.configUpdateNotifications.collect { changes ->
try {
when (changes) {
is ConfigUpdateNotification.GroupConfigsDeleted -> {}
is ConfigUpdateNotification.GroupConfigsUpdated -> {
pushGroupConfigsChangesIfNeeded(changes.groupId)
}
ConfigUpdateNotification.UserConfigs -> pushUserConfigChangesIfNeeded()
}
} catch (e: Exception) {
Log.e("ConfigSyncHandler", "Error handling config update", e)
}
}
}
}
private suspend fun pushGroupConfigsChangesIfNeeded(groupId: AccountId): Unit = coroutineScope {
}
private suspend fun pushUserConfigChangesIfNeeded(): Unit = coroutineScope {
val userAuth = requireNotNull(storageProtocol.userAuth) {
"Current user not available"
}
data class PushInformation(
val namespace: Int,
val configClass: Class<out MutableConfig>,
val push: ConfigPush,
)
// Gather all the user configs that need to be pushed
val pushes = configFactory.withMutableUserConfigs { configs ->
configs.allConfigs()
.filter { it.needsPush() }
.map { config ->
PushInformation(
namespace = config.namespace(),
configClass = config.javaClass,
push = config.push(),
)
}
.toList()
}
Log.d("ConfigSyncHandler", "Pushing ${pushes.size} configs")
val snode = SnodeAPI.getSingleTargetSnode(userAuth.accountId.hexString).await()
val pushTasks = pushes.map { info ->
val calls = buildList {
this += SnodeAPI.buildAuthenticatedStoreBatchInfo(
info.namespace,
SnodeMessage(
userAuth.accountId.hexString,
Base64.encodeBytes(info.push.config),
SnodeMessage.CONFIG_TTL,
SnodeAPI.nowWithOffset,
),
userAuth
)
if (info.push.obsoleteHashes.isNotEmpty()) {
this += SnodeAPI.buildAuthenticatedDeleteBatchInfo(
messageHashes = info.push.obsoleteHashes,
auth = userAuth,
)
}
}
async {
val responses = SnodeAPI.getBatchResponse(
snode = snode,
publicKey = userAuth.accountId.hexString,
requests = calls,
sequence = true
)
val firstError = responses.results.firstOrNull { !it.isSuccessful }
check(firstError == null) {
"Failed to push config change due to error: ${firstError?.body}"
}
val hash = responses.results.first().body.get("hash").asText()
require(hash.isNotEmpty()) {
"Missing server hash for pushed config"
}
info to hash
}
}
val pushResults = pushTasks.awaitAll().associateBy { it.first.configClass }
Log.d("ConfigSyncHandler", "Pushed ${pushResults.size} configs")
configFactory.withMutableUserConfigs { configs ->
configs.allConfigs()
.mapNotNull { config -> pushResults[config.javaClass]?.let { Triple(config, it.first, it.second) } }
.forEach { (config, info, hash) ->
config.confirmPushed(info.push.seqNo, hash)
}
}
}
}

View File

@@ -8,7 +8,7 @@ import kotlinx.coroutines.async
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.firstOrNull
import kotlinx.coroutines.launch
import network.loki.messenger.libsession_util.GroupKeysConfig
import network.loki.messenger.libsession_util.ReadableGroupKeysConfig
import network.loki.messenger.libsession_util.util.GroupMember
import network.loki.messenger.libsession_util.util.Sodium
import org.session.libsession.messaging.messages.Destination
@@ -17,8 +17,8 @@ import org.session.libsession.messaging.sending_receiving.MessageSender
import org.session.libsession.snode.OwnedSwarmAuth
import org.session.libsession.snode.SnodeAPI
import org.session.libsession.snode.SnodeMessage
import org.session.libsession.snode.utilities.await
import org.session.libsession.utilities.ConfigFactoryProtocol
import org.session.libsession.utilities.withGroupConfigsOrNull
import org.session.libsignal.protos.SignalServiceProtos
import org.session.libsignal.protos.SignalServiceProtos.DataMessage.GroupUpdateMessage
import org.session.libsignal.utilities.AccountId
@@ -61,12 +61,8 @@ class RemoveGroupMemberHandler(
}
private suspend fun processPendingMemberRemoval() {
val userGroups = checkNotNull(configFactory.userGroups) {
"User groups config is null"
}
// Run the removal process for each group in parallel
val removalTasks = userGroups.allClosedGroupInfo()
val removalTasks = configFactory.withUserConfigs { it.userGroups.allClosedGroupInfo() }
.asSequence()
.filter { it.hasAdminKey() }
.associate { group ->
@@ -89,7 +85,7 @@ class RemoveGroupMemberHandler(
}
}
private fun processPendingRemovalsForGroup(
private suspend fun processPendingRemovalsForGroup(
groupAccountId: AccountId,
groupName: String,
adminKey: ByteArray
@@ -100,11 +96,11 @@ class RemoveGroupMemberHandler(
ed25519PrivateKey = adminKey
)
configFactory.withGroupConfigsOrNull(groupAccountId) withConfig@ { info, members, keys ->
val pendingRemovals = members.all().filter { it.removed }
val batchCalls = configFactory.withGroupConfigs(groupAccountId) { configs ->
val pendingRemovals = configs.groupMembers.all().filter { it.removed }
if (pendingRemovals.isEmpty()) {
// Skip if there are no pending removals
return@withConfig
return@withGroupConfigs emptyList()
}
Log.d(TAG, "Processing ${pendingRemovals.size} pending removals for group $groupName")
@@ -113,28 +109,28 @@ class RemoveGroupMemberHandler(
// 1. Revoke the member's sub key (by adding the key to a "revoked list" under the hood)
// 2. Send a message to a special namespace to inform the removed members they have been removed
// 3. Conditionally, delete removed-members' messages from the group's message store, if that option is selected by the actioning admin
val seqCalls = ArrayList<SnodeAPI.SnodeBatchRequestInfo>(3)
val calls = ArrayList<SnodeAPI.SnodeBatchRequestInfo>(3)
// Call No 1. Revoke sub-key. This call is crucial and must not fail for the rest of the operation to be successful.
seqCalls += checkNotNull(
calls += checkNotNull(
SnodeAPI.buildAuthenticatedRevokeSubKeyBatchRequest(
groupAdminAuth = swarmAuth,
subAccountTokens = pendingRemovals.map {
keys.getSubAccountToken(AccountId(it.sessionId))
configs.groupKeys.getSubAccountToken(AccountId(it.sessionId))
}
)
) { "Fail to create a revoke request" }
// Call No 2. Send a message to the removed members
seqCalls += SnodeAPI.buildAuthenticatedStoreBatchInfo(
calls += SnodeAPI.buildAuthenticatedStoreBatchInfo(
namespace = Namespace.REVOKED_GROUP_MESSAGES(),
message = buildGroupKickMessage(groupAccountId.hexString, pendingRemovals, keys, adminKey),
message = buildGroupKickMessage(groupAccountId.hexString, pendingRemovals, configs.groupKeys, adminKey),
auth = swarmAuth,
)
// Call No 3. Conditionally remove the message from the group's message store
if (pendingRemovals.any { it.shouldRemoveMessages }) {
seqCalls += SnodeAPI.buildAuthenticatedStoreBatchInfo(
calls += SnodeAPI.buildAuthenticatedStoreBatchInfo(
namespace = Namespace.CLOSED_GROUP_MESSAGES(),
message = buildDeleteGroupMemberContentMessage(
groupAccountId = groupAccountId.hexString,
@@ -147,9 +143,17 @@ class RemoveGroupMemberHandler(
)
}
// Make the call:
SnodeAPI.getSingleTargetSnode(groupAccountId.hexString)
calls
}
if (batchCalls.isEmpty()) {
return
}
val node = SnodeAPI.getSingleTargetSnode(groupAccountId.hexString).await()
SnodeAPI.getBatchResponse(node, groupAccountId.hexString, batchCalls, true)
//TODO: Handle message removal
}
private fun buildDeleteGroupMemberContentMessage(
@@ -178,7 +182,7 @@ class RemoveGroupMemberHandler(
private fun buildGroupKickMessage(
groupAccountId: String,
pendingRemovals: List<GroupMember>,
keys: GroupKeysConfig,
keys: ReadableGroupKeysConfig,
adminKey: ByteArray
) = SnodeMessage(
recipient = groupAccountId,

View File

@@ -1,338 +0,0 @@
package org.session.libsession.messaging.jobs
import network.loki.messenger.libsession_util.Config
import network.loki.messenger.libsession_util.ConfigBase
import network.loki.messenger.libsession_util.GroupKeysConfig
import nl.komponents.kovenant.functional.bind
import org.session.libsession.database.userAuth
import org.session.libsession.messaging.MessagingModuleConfiguration
import org.session.libsession.messaging.messages.Destination
import org.session.libsession.messaging.utilities.Data
import org.session.libsession.snode.OwnedSwarmAuth
import org.session.libsession.snode.RawResponse
import org.session.libsession.snode.SnodeAPI
import org.session.libsession.snode.SnodeAPI.SnodeBatchRequestInfo
import org.session.libsession.snode.SnodeMessage
import org.session.libsession.snode.SwarmAuth
import org.session.libsession.utilities.ConfigFactoryProtocol
import org.session.libsignal.utilities.AccountId
import org.session.libsignal.utilities.Base64
import org.session.libsignal.utilities.Log
import java.util.concurrent.atomic.AtomicBoolean
class InvalidDestination :
Exception("Trying to push configs somewhere other than our swarm or a closed group")
// only contact (self) and closed group destinations will be supported
data class ConfigurationSyncJob(val destination: Destination) : Job {
override var delegate: JobDelegate? = null
override var id: String? = null
override var failureCount: Int = 0
override val maxFailureCount: Int = 10
val shouldRunAgain = AtomicBoolean(false)
data class ConfigMessageInformation(
val batch: SnodeBatchRequestInfo,
val config: Config,
val seqNo: Long?
) // seqNo will be null for keys type
data class SyncInformation(
val configs: List<ConfigMessageInformation>,
val toDelete: List<String>
)
private fun destinationConfigs(
configFactoryProtocol: ConfigFactoryProtocol
): SyncInformation {
val toDelete = mutableListOf<String>()
val configsRequiringPush =
if (destination is Destination.ClosedGroup) {
// destination is a closed group, get all configs requiring push here
val groupId = AccountId(destination.publicKey)
// Get the signing key for pushing configs
val signingKey = configFactoryProtocol
.userGroups?.getClosedGroup(destination.publicKey)?.adminKey
if (signingKey?.isNotEmpty() == true) {
val info = configFactoryProtocol.getGroupInfoConfig(groupId)!!
val members = configFactoryProtocol.getGroupMemberConfig(groupId)!!
val keys = configFactoryProtocol.getGroupKeysConfig(
groupId,
info,
members,
false
)!!
val requiringPush =
listOf(keys, info, members).filter {
when (it) {
is GroupKeysConfig -> it.pendingConfig()?.isNotEmpty() == true
is ConfigBase -> it.needsPush()
else -> false
}
}
// free the objects that were created but won't be used after this point
// in case any of the configs don't need pushing, they won't be freed later
(listOf(keys, info, members) subtract requiringPush).forEach(Config::free)
val groupAuth = OwnedSwarmAuth.ofClosedGroup(groupId, signingKey)
requiringPush.mapNotNull { config ->
if (config is GroupKeysConfig) {
config.messageInformation(groupAuth)
} else if (config is ConfigBase) {
config.messageInformation(toDelete, groupAuth)
} else {
Log.e("ConfigurationSyncJob", "Tried to create a message from an unknown config")
null
}
}
} else emptyList()
} else if (destination is Destination.Contact) {
val userAuth = requireNotNull(MessagingModuleConfiguration.shared.storage.userAuth) {
"No user auth for syncing user config"
}
// assume our own user as check already takes place in `execute` for our own key
// if contact
configFactoryProtocol.getUserConfigs().filter { it.needsPush() }.map { config ->
config.messageInformation(toDelete, userAuth)
}
} else throw InvalidDestination()
return SyncInformation(configsRequiringPush, toDelete)
}
override suspend fun execute(dispatcherName: String) {
val storage = MessagingModuleConfiguration.shared.storage
val userPublicKey = storage.getUserPublicKey()
val delegate = delegate ?: return Log.e("ConfigurationSyncJob", "No Delegate")
if (destination !is Destination.ClosedGroup &&
(destination !is Destination.Contact ||
destination.publicKey != userPublicKey)
) {
return delegate.handleJobFailedPermanently(this, dispatcherName, InvalidDestination())
}
// configFactory singleton instance will come in handy for modifying hashes and fetching
// configs for namespace etc
val configFactory = MessagingModuleConfiguration.shared.configFactory
// allow null results here so the list index matches configsRequiringPush
val (batchObjects, toDeleteHashes) =
destinationConfigs(configFactory)
if (batchObjects.isEmpty()) return delegate.handleJobSucceeded(this, dispatcherName)
val toDeleteRequest =
toDeleteHashes.let { toDeleteFromAllNamespaces ->
if (toDeleteFromAllNamespaces.isEmpty()) null
else if (destination is Destination.ClosedGroup) {
// Build sign callback for group's admin key
val signingKey =
configFactory.userGroups
?.getClosedGroup(destination.publicKey)
?.adminKey ?: return@let null
// Destination is a closed group swarm, build with signCallback
SnodeAPI.buildAuthenticatedDeleteBatchInfo(
OwnedSwarmAuth.ofClosedGroup(
groupAccountId = AccountId(destination.publicKey),
adminKey = signingKey,
),
toDeleteFromAllNamespaces,
)
} else {
// Destination is our own swarm
val userAuth = MessagingModuleConfiguration.shared.storage.userAuth
if (userAuth == null) {
delegate.handleJobFailedPermanently(
this,
dispatcherName,
IllegalStateException("No user auth for syncing user config")
)
return
}
SnodeAPI.buildAuthenticatedDeleteBatchInfo(
auth = userAuth,
messageHashes = toDeleteFromAllNamespaces
)
}
}
val allRequests = mutableListOf<SnodeBatchRequestInfo>()
allRequests += batchObjects.map { (request) -> request }
// add in the deletion if we have any hashes
if (toDeleteRequest != null) {
allRequests += toDeleteRequest
Log.d(TAG, "Including delete request for current hashes")
}
val batchResponse =
SnodeAPI.getSingleTargetSnode(destination.destinationPublicKey()).bind { snode ->
SnodeAPI.getRawBatchResponse(
snode,
destination.destinationPublicKey(),
allRequests,
sequence = true
)
}
try {
val rawResponses = batchResponse.get()
@Suppress("UNCHECKED_CAST")
val responseList = (rawResponses["results"] as List<RawResponse>)
// at this point responseList index should line up with configsRequiringPush index
batchObjects.forEachIndexed { index, (message, config, seqNo) ->
val response = responseList[index]
val responseBody = response["body"] as? RawResponse
val insertHash =
responseBody?.get("hash") as? String
?: run {
Log.w(
TAG,
"No hash returned for the configuration in namespace ${config.namespace()}"
)
return@forEachIndexed
}
Log.d(TAG, "Hash ${insertHash.take(4)} returned from store request for new config")
// confirm pushed seqno
if (config is ConfigBase) {
seqNo?.let { config.confirmPushed(it, insertHash) }
}
Log.d(
TAG,
"Successfully removed the deleted hashes from ${config.javaClass.simpleName}"
)
// dump and write config after successful
if (config is ConfigBase && config.needsDump()) { // usually this will be true? ))
val groupPubKey = if (destination is Destination.ClosedGroup) destination.publicKey else null
configFactory.persist(config, message.params["timestamp"] as Long, groupPubKey)
} else if (config is GroupKeysConfig && config.needsDump()) {
Log.d("Loki", "Should persist the GroupKeysConfig")
}
if (destination is Destination.ClosedGroup) {
config.free() // after they are used, free the temporary group configs
}
}
} catch (e: Exception) {
Log.e(TAG, "Error performing batch request", e)
return delegate.handleJobFailed(this, dispatcherName, e)
}
delegate.handleJobSucceeded(this, dispatcherName)
if (shouldRunAgain.get() && storage.getConfigSyncJob(destination) == null) {
// reschedule if something has updated since we started this job
JobQueue.shared.add(ConfigurationSyncJob(destination))
}
}
fun Destination.destinationPublicKey(): String =
when (this) {
is Destination.Contact -> publicKey
is Destination.ClosedGroup -> publicKey
else -> throw NullPointerException("Not public key for this destination")
}
override fun serialize(): Data {
val (type, address) =
when (destination) {
is Destination.Contact -> CONTACT_TYPE to destination.publicKey
is Destination.ClosedGroup -> GROUP_TYPE to destination.publicKey
else -> return Data.EMPTY
}
return Data.Builder()
.putInt(DESTINATION_TYPE_KEY, type)
.putString(DESTINATION_ADDRESS_KEY, address)
.build()
}
override fun getFactoryKey(): String = KEY
companion object {
const val TAG = "ConfigSyncJob"
const val KEY = "ConfigSyncJob"
// Keys used for DB storage
const val DESTINATION_ADDRESS_KEY = "destinationAddress"
const val DESTINATION_TYPE_KEY = "destinationType"
// type mappings
const val CONTACT_TYPE = 1
const val GROUP_TYPE = 2
fun ConfigBase.messageInformation(toDelete: MutableList<String>,
auth: SwarmAuth): ConfigMessageInformation {
val sentTimestamp = SnodeAPI.nowWithOffset
val (push, seqNo, obsoleteHashes) = push()
toDelete.addAll(obsoleteHashes)
val message =
SnodeMessage(
auth.accountId.hexString,
Base64.encodeBytes(push),
SnodeMessage.CONFIG_TTL,
sentTimestamp
)
return ConfigMessageInformation(
SnodeAPI.buildAuthenticatedStoreBatchInfo(
namespace(),
message,
auth,
),
this,
seqNo
)
}
fun GroupKeysConfig.messageInformation(auth: OwnedSwarmAuth): ConfigMessageInformation? {
val pending = pendingConfig() ?: return null
val sentTimestamp = SnodeAPI.nowWithOffset
val message =
SnodeMessage(
auth.accountId.hexString,
Base64.encodeBytes(pending),
SnodeMessage.CONFIG_TTL,
sentTimestamp
)
return ConfigMessageInformation(
SnodeAPI.buildAuthenticatedStoreBatchInfo(
namespace(),
message,
auth,
),
this,
0
)
}
}
class Factory : Job.Factory<ConfigurationSyncJob> {
override fun create(data: Data): ConfigurationSyncJob? {
if (!data.hasInt(DESTINATION_TYPE_KEY) || !data.hasString(DESTINATION_ADDRESS_KEY))
return null
val address = data.getString(DESTINATION_ADDRESS_KEY)
val destination =
when (data.getInt(DESTINATION_TYPE_KEY)) {
CONTACT_TYPE -> Destination.Contact(address)
GROUP_TYPE -> Destination.ClosedGroup(address)
else -> return null
}
return ConfigurationSyncJob(destination)
}
}
}

View File

@@ -1,199 +0,0 @@
package org.session.libsession.messaging.jobs
import android.widget.Toast
import com.google.protobuf.ByteString
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.withContext
import org.session.libsession.R
import org.session.libsession.messaging.MessagingModuleConfiguration
import org.session.libsession.messaging.messages.Destination
import org.session.libsession.messaging.messages.control.GroupUpdated
import org.session.libsession.messaging.sending_receiving.MessageSender
import org.session.libsession.messaging.utilities.Data
import org.session.libsession.messaging.utilities.MessageAuthentication.buildGroupInviteSignature
import org.session.libsession.messaging.utilities.SodiumUtilities
import org.session.libsession.snode.SnodeAPI
import org.session.libsession.utilities.StringSubstitutionConstants.GROUP_NAME_KEY
import org.session.libsession.utilities.StringSubstitutionConstants.NAME_KEY
import org.session.libsession.utilities.StringSubstitutionConstants.OTHER_NAME_KEY
import org.session.libsession.utilities.truncateIdForDisplay
import org.session.libsignal.protos.SignalServiceProtos.DataMessage.GroupUpdateInviteMessage
import org.session.libsignal.protos.SignalServiceProtos.DataMessage.GroupUpdateMessage
import org.session.libsignal.utilities.AccountId
import org.session.libsignal.utilities.prettifiedDescription
class InviteContactsJob(val groupSessionId: String, val memberSessionIds: Array<String>) : Job {
companion object {
const val KEY = "InviteContactJob"
private const val GROUP = "group"
private const val MEMBER = "member"
}
override var delegate: JobDelegate? = null
override var id: String? = null
override var failureCount: Int = 0
override val maxFailureCount: Int = 1
override suspend fun execute(dispatcherName: String) {
val delegate = delegate ?: return
val configs = MessagingModuleConfiguration.shared.configFactory
val adminKey = configs.userGroups?.getClosedGroup(groupSessionId)?.adminKey
?: return delegate.handleJobFailedPermanently(
this,
dispatcherName,
NullPointerException("No admin key")
)
withContext(Dispatchers.IO) {
val sessionId = AccountId(groupSessionId)
val members = configs.getGroupMemberConfig(sessionId)
val info = configs.getGroupInfoConfig(sessionId)
val keys = configs.getGroupKeysConfig(sessionId, info, members, free = false)
if (members == null || info == null || keys == null) {
return@withContext delegate.handleJobFailedPermanently(
this@InviteContactsJob,
dispatcherName,
NullPointerException("One of the group configs was null")
)
}
val requests = memberSessionIds.map { memberSessionId ->
async {
// Make the request for this member
val member = members.get(memberSessionId) ?: return@async run {
InviteResult.failure(
memberSessionId,
NullPointerException("No group member ${memberSessionId.prettifiedDescription()} in members config")
)
}
members.set(member.setInvited())
configs.saveGroupConfigs(keys, info, members)
val accountId = AccountId(memberSessionId)
val subAccount = keys.makeSubAccount(accountId)
val timestamp = SnodeAPI.nowWithOffset
val signature = SodiumUtilities.sign(
buildGroupInviteSignature(accountId, timestamp),
adminKey
)
val groupInvite = GroupUpdateInviteMessage.newBuilder()
.setGroupSessionId(groupSessionId)
.setMemberAuthData(ByteString.copyFrom(subAccount))
.setAdminSignature(ByteString.copyFrom(signature))
.setName(info.getName())
val message = GroupUpdateMessage.newBuilder()
.setInviteMessage(groupInvite)
.build()
val update = GroupUpdated(message).apply {
sentTimestamp = timestamp
}
try {
MessageSender.send(update, Destination.Contact(memberSessionId), false)
.get()
InviteResult.success(memberSessionId)
} catch (e: Exception) {
InviteResult.failure(memberSessionId, e)
}
}
}
val results = requests.awaitAll()
results.forEach { result ->
if (!result.success) {
// update invite failed
val toSet = members.get(result.memberSessionId)
?.setInviteFailed()
?: return@forEach
members.set(toSet)
}
}
val failures = results.filter { !it.success }
// if there are failed invites, display a message
// assume job "success" even if we fail, the state of invites is tracked outside of this job
if (failures.isNotEmpty()) {
// show the failure toast
val storage = MessagingModuleConfiguration.shared.storage
val toaster = MessagingModuleConfiguration.shared.toaster
when (failures.size) {
1 -> {
val first = failures.first()
val firstString = first.memberSessionId.let { storage.getContactWithAccountID(it) }?.name
?: truncateIdForDisplay(first.memberSessionId)
withContext(Dispatchers.Main) {
toaster.toast(R.string.groupInviteFailedUser, Toast.LENGTH_LONG,
mapOf(
NAME_KEY to firstString,
GROUP_NAME_KEY to info.getName()
)
)
}
}
2 -> {
val (first, second) = failures
val firstString = first.memberSessionId.let { storage.getContactWithAccountID(it) }?.name
?: truncateIdForDisplay(first.memberSessionId)
val secondString = second.memberSessionId.let { storage.getContactWithAccountID(it) }?.name
?: truncateIdForDisplay(second.memberSessionId)
withContext(Dispatchers.Main) {
toaster.toast(R.string.groupInviteFailedTwo, Toast.LENGTH_LONG,
mapOf(
NAME_KEY to firstString,
OTHER_NAME_KEY to secondString,
GROUP_NAME_KEY to info.getName()
)
)
}
}
else -> {
val first = failures.first()
val firstString = first.memberSessionId.let { storage.getContactWithAccountID(it) }?.name
?: truncateIdForDisplay(first.memberSessionId)
val remaining = failures.size - 1
withContext(Dispatchers.Main) {
toaster.toast(R.string.groupInviteFailedMultiple, Toast.LENGTH_LONG,
mapOf(
NAME_KEY to firstString,
OTHER_NAME_KEY to remaining.toString(),
GROUP_NAME_KEY to info.getName()
)
)
}
}
}
}
configs.saveGroupConfigs(keys, info, members)
keys.free()
info.free()
members.free()
}
}
@Suppress("DataClassPrivateConstructor")
data class InviteResult private constructor(
val memberSessionId: String,
val success: Boolean,
val error: Exception? = null
) {
companion object {
fun success(memberSessionId: String) = InviteResult(memberSessionId, success = true)
fun failure(memberSessionId: String, error: Exception) =
InviteResult(memberSessionId, success = false, error)
}
}
override fun serialize(): Data =
Data.Builder()
.putString(GROUP, groupSessionId)
.putStringArray(MEMBER, memberSessionIds)
.build()
override fun getFactoryKey(): String = KEY
}

View File

@@ -29,7 +29,6 @@ class JobQueue : JobDelegate {
private val rxMediaDispatcher = Executors.newFixedThreadPool(4).asCoroutineDispatcher()
private val openGroupDispatcher = Executors.newFixedThreadPool(8).asCoroutineDispatcher()
private val txDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
private val configDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
private val scope = CoroutineScope(Dispatchers.Default) + SupervisorJob()
private val queue = Channel<Job>(UNLIMITED)
@@ -117,20 +116,14 @@ class JobQueue : JobDelegate {
val txQueue = Channel<Job>(capacity = UNLIMITED)
val mediaQueue = Channel<Job>(capacity = UNLIMITED)
val openGroupQueue = Channel<Job>(capacity = UNLIMITED)
val configQueue = Channel<Job>(capacity = UNLIMITED)
val receiveJob = processWithDispatcher(rxQueue, rxDispatcher, "rx", asynchronous = false)
val txJob = processWithDispatcher(txQueue, txDispatcher, "tx")
val mediaJob = processWithDispatcher(mediaQueue, rxMediaDispatcher, "media")
val openGroupJob = processWithOpenGroupDispatcher(openGroupQueue, openGroupDispatcher, "openGroup")
val configJob = processWithDispatcher(configQueue, configDispatcher, "configDispatcher")
while (isActive) {
when (val job = queue.receive()) {
is InviteContactsJob,
is ConfigurationSyncJob -> {
configQueue.send(job)
}
is NotifyPNServerJob,
is AttachmentUploadJob,
is GroupLeavingJob,
@@ -167,7 +160,6 @@ class JobQueue : JobDelegate {
txJob.cancel()
mediaJob.cancel()
openGroupJob.cancel()
configJob.cancel()
}
}
@@ -239,8 +231,6 @@ class JobQueue : JobDelegate {
BackgroundGroupAddJob.KEY,
OpenGroupDeleteJob.KEY,
RetrieveProfileAvatarJob.KEY,
ConfigurationSyncJob.KEY,
InviteContactsJob.KEY,
GroupLeavingJob.KEY,
LibSessionGroupLeavingJob.KEY
)

View File

@@ -16,7 +16,6 @@ class SessionJobManagerFactories {
GroupAvatarDownloadJob.KEY to GroupAvatarDownloadJob.Factory(),
BackgroundGroupAddJob.KEY to BackgroundGroupAddJob.Factory(),
OpenGroupDeleteJob.KEY to OpenGroupDeleteJob.Factory(),
ConfigurationSyncJob.KEY to ConfigurationSyncJob.Factory()
)
}
}

View File

@@ -25,7 +25,6 @@ import org.session.libsession.messaging.open_groups.OpenGroupApi.Capability
import org.session.libsession.messaging.open_groups.OpenGroupMessage
import org.session.libsession.messaging.utilities.MessageWrapper
import org.session.libsession.messaging.utilities.SodiumUtilities
import org.session.libsession.snode.GroupSubAccountSwarmAuth
import org.session.libsession.snode.OwnedSwarmAuth
import org.session.libsession.snode.RawResponsePromise
import org.session.libsession.snode.SnodeAPI
@@ -184,13 +183,9 @@ object MessageSender {
MessageEncrypter.encrypt(plaintext, encryptionKeyPair.hexEncodedPublicKey)
}
is Destination.ClosedGroup -> {
val groupKeys = configFactory.getGroupKeysConfig(AccountId(destination.publicKey)) ?: throw Error.NoKeyPair
val envelope = MessageWrapper.createEnvelope(kind, message.sentTimestamp!!, senderPublicKey, proto.build().toByteArray())
groupKeys.use { keys ->
if (keys.keys().isEmpty()) {
throw Error.EncryptionFailed
}
keys.encrypt(envelope.toByteArray())
configFactory.withGroupConfigs(AccountId(destination.publicKey)) {
it.groupKeys.encrypt(envelope.toByteArray())
}
}
else -> throw IllegalStateException("Destination should not be open group.")
@@ -252,27 +247,13 @@ object MessageSender {
namespaces.mapNotNull { namespace ->
if (destination is Destination.ClosedGroup) {
// possibly handle a failure for no user groups or no closed group signing key?
val group = configFactory.userGroups?.getClosedGroup(destination.publicKey) ?: return@mapNotNull null
val groupAuthData = group.authData
val groupAdminKey = group.adminKey
if (groupAuthData != null) {
configFactory.getGroupKeysConfig(AccountId(destination.publicKey))?.use { keys ->
SnodeAPI.sendMessage(
auth = GroupSubAccountSwarmAuth(keys, AccountId(destination.publicKey), groupAuthData),
message = snodeMessage,
namespace = namespace
)
}
} else if (groupAdminKey != null) {
SnodeAPI.sendMessage(
auth = OwnedSwarmAuth(AccountId(destination.publicKey), null, groupAdminKey),
message = snodeMessage,
namespace = namespace
)
} else {
Log.w("MessageSender", "No auth data for group")
null
}
val groupAuth = configFactory.getGroupAuth(AccountId(destination.publicKey)) ?: return@mapNotNull null
SnodeAPI.sendMessage(
auth = groupAuth,
message = snodeMessage,
namespace = namespace
)
} else {
SnodeAPI.sendMessage(snodeMessage, auth = null, namespace = namespace)
}
@@ -353,9 +334,9 @@ object MessageSender {
message.sentTimestamp = nowWithOffset
}
// Attach the blocks message requests info
configFactory.user?.let { user ->
configFactory.withUserConfigs { configs ->
if (message is VisibleMessage) {
message.blocksMessageRequests = !user.getCommunityMessageRequests()
message.blocksMessageRequests = !configs.userProfile.getCommunityMessageRequests()
}
}
val userEdKeyPair = MessagingModuleConfiguration.shared.storage.getUserED25519KeyPair()!!

View File

@@ -9,25 +9,18 @@ import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import network.loki.messenger.libsession_util.GroupInfoConfig
import network.loki.messenger.libsession_util.GroupKeysConfig
import network.loki.messenger.libsession_util.GroupMembersConfig
import network.loki.messenger.libsession_util.util.GroupInfo
import network.loki.messenger.libsession_util.util.Sodium
import org.session.libsession.messaging.MessagingModuleConfiguration
import org.session.libsession.database.StorageProtocol
import org.session.libsession.messaging.groups.GroupManagerV2
import org.session.libsession.messaging.jobs.BatchMessageReceiveJob
import org.session.libsession.messaging.jobs.JobQueue
import org.session.libsession.messaging.jobs.MessageReceiveParameters
import org.session.libsession.messaging.messages.Destination
import org.session.libsession.snode.GroupSubAccountSwarmAuth
import org.session.libsession.snode.OwnedSwarmAuth
import org.session.libsession.snode.RawResponse
import org.session.libsession.snode.SnodeAPI
import org.session.libsession.snode.model.BatchResponse
import org.session.libsession.snode.utilities.await
import org.session.libsession.utilities.ConfigFactoryProtocol
import org.session.libsession.utilities.withGroupConfigsOrNull
import org.session.libsignal.utilities.AccountId
import org.session.libsignal.utilities.Base64
import org.session.libsignal.utilities.Log
@@ -41,7 +34,9 @@ class ClosedGroupPoller(
private val executor: CoroutineDispatcher,
private val closedGroupSessionId: AccountId,
private val configFactoryProtocol: ConfigFactoryProtocol,
private val groupManagerV2: GroupManagerV2) {
private val groupManagerV2: GroupManagerV2,
private val storage: StorageProtocol,
) {
data class ParsedRawMessage(
val data: ByteArray,
@@ -82,9 +77,8 @@ class ClosedGroupPoller(
if (ENABLE_LOGGING) Log.d("ClosedGroupPoller", "Starting closed group poller for ${closedGroupSessionId.hexString.take(4)}")
job?.cancel()
job = scope.launch(executor) {
val closedGroups = configFactoryProtocol.userGroups ?: return@launch
while (isActive) {
val group = closedGroups.getClosedGroup(closedGroupSessionId.hexString) ?: break
val group = configFactoryProtocol.withUserConfigs { it.userGroups.getClosedGroup(closedGroupSessionId.hexString) } ?: break
val nextPoll = runCatching { poll(group) }
when {
nextPoll.isFailure -> {
@@ -114,156 +108,122 @@ class ClosedGroupPoller(
private suspend fun poll(group: GroupInfo.ClosedGroupInfo): Long? = coroutineScope {
val snode = SnodeAPI.getSingleTargetSnode(closedGroupSessionId.hexString).await()
configFactoryProtocol.withGroupConfigsOrNull(closedGroupSessionId) { info, members, keys ->
val hashesToExtend = mutableSetOf<String>()
val groupAuth = configFactoryProtocol.getGroupAuth(closedGroupSessionId) ?: return@coroutineScope null
val configHashesToExtends = configFactoryProtocol.withGroupConfigs(closedGroupSessionId) {
buildSet {
addAll(it.groupKeys.currentHashes())
addAll(it.groupInfo.currentHashes())
addAll(it.groupMembers.currentHashes())
}
}
hashesToExtend += info.currentHashes()
hashesToExtend += members.currentHashes()
hashesToExtend += keys.currentHashes()
val adminKey = requireNotNull(configFactoryProtocol.withUserConfigs { it.userGroups.getClosedGroup(closedGroupSessionId.hexString) }) {
"Group doesn't exist"
}.adminKey
val authData = group.authData
val adminKey = group.adminKey
val groupAccountId = group.groupAccountId
val auth = if (authData != null) {
GroupSubAccountSwarmAuth(
groupKeysConfig = keys,
accountId = groupAccountId,
authData = authData
val pollingTasks = mutableListOf<Pair<String, Deferred<*>>>()
pollingTasks += "Poll revoked messages" to async {
handleRevoked(
SnodeAPI.sendBatchRequest(
snode,
closedGroupSessionId.hexString,
SnodeAPI.buildAuthenticatedRetrieveBatchRequest(
snode = snode,
auth = groupAuth,
namespace = Namespace.REVOKED_GROUP_MESSAGES(),
maxSize = null,
),
Map::class.java
)
} else if (adminKey != null) {
OwnedSwarmAuth.ofClosedGroup(
groupAccountId = groupAccountId,
adminKey = adminKey
)
} else {
Log.e("ClosedGroupPoller", "No auth data for group, polling is cancelled")
return@coroutineScope null
}
val pollingTasks = mutableListOf<Pair<String, Deferred<*>>>()
pollingTasks += "Poll revoked messages" to async {
handleRevoked(
body = SnodeAPI.sendBatchRequest(
groupAccountId,
SnodeAPI.buildAuthenticatedRetrieveBatchRequest(
snode = snode,
auth = auth,
namespace = Namespace.REVOKED_GROUP_MESSAGES(),
maxSize = null,
),
Map::class.java),
keys = keys
)
}
pollingTasks += "Poll group messages" to async {
handleMessages(
body = SnodeAPI.sendBatchRequest(
groupAccountId,
SnodeAPI.buildAuthenticatedRetrieveBatchRequest(
snode = snode,
auth = auth,
namespace = Namespace.CLOSED_GROUP_MESSAGES(),
maxSize = null,
),
Map::class.java),
snode = snode,
keysConfig = keys
)
}
pollingTasks += "Poll group keys config" to async {
handleKeyPoll(
response = SnodeAPI.sendBatchRequest(
groupAccountId,
SnodeAPI.buildAuthenticatedRetrieveBatchRequest(
snode = snode,
auth = auth,
namespace = keys.namespace(),
maxSize = null,
),
Map::class.java),
keysConfig = keys,
infoConfig = info,
membersConfig = members
)
}
pollingTasks += "Poll group info config" to async {
handleInfo(
response = SnodeAPI.sendBatchRequest(
groupAccountId,
SnodeAPI.buildAuthenticatedRetrieveBatchRequest(
snode = snode,
auth = auth,
namespace = Namespace.CLOSED_GROUP_INFO(),
maxSize = null,
),
Map::class.java),
infoConfig = info
)
}
pollingTasks += "Poll group members config" to async {
handleMembers(
response = SnodeAPI.sendBatchRequest(
groupAccountId,
SnodeAPI.buildAuthenticatedRetrieveBatchRequest(
snode = snode,
auth = auth,
namespace = Namespace.CLOSED_GROUP_MEMBERS(),
maxSize = null,
),
Map::class.java),
membersConfig = members
)
}
if (hashesToExtend.isNotEmpty() && adminKey != null) {
pollingTasks += "Extend group config TTL" to async {
SnodeAPI.sendBatchRequest(
groupAccountId,
SnodeAPI.buildAuthenticatedAlterTtlBatchRequest(
messageHashes = hashesToExtend.toList(),
auth = auth,
newExpiry = SnodeAPI.nowWithOffset + 14.days.inWholeMilliseconds,
extend = true
),
)
}
}
val errors = pollingTasks.mapNotNull { (name, task) ->
runCatching { task.await() }
.exceptionOrNull()
?.takeIf { it !is CancellationException }
?.let { RuntimeException("Error executing: $name", it) }
}
if (errors.isNotEmpty()) {
throw PollerException("Error polling closed group", errors)
}
// If we no longer have a group, stop poller
if (configFactoryProtocol.userGroups?.getClosedGroup(closedGroupSessionId.hexString) == null) return@coroutineScope null
// if poll result body is null here we don't have any things ig
if (ENABLE_LOGGING) Log.d(
"ClosedGroupPoller",
"Poll results @${SnodeAPI.nowWithOffset}:"
)
}
val requiresSync =
info.needsPush() || members.needsPush() || keys.needsRekey() || keys.pendingConfig() != null
pollingTasks += "Poll group messages" to async {
handleMessages(
body = SnodeAPI.sendBatchRequest(
snode,
closedGroupSessionId.hexString,
SnodeAPI.buildAuthenticatedRetrieveBatchRequest(
snode = snode,
auth = groupAuth,
namespace = Namespace.CLOSED_GROUP_MESSAGES(),
maxSize = null,
),
Map::class.java),
snode = snode,
)
}
if (info.needsDump() || members.needsDump() || keys.needsDump()) {
configFactoryProtocol.saveGroupConfigs(keys, info, members)
pollingTasks += "Poll group keys config" to async {
handleKeyPoll(
response = SnodeAPI.sendBatchRequest(
snode,
closedGroupSessionId.hexString,
SnodeAPI.buildAuthenticatedRetrieveBatchRequest(
snode = snode,
auth = groupAuth,
namespace = Namespace.ENCRYPTION_KEYS(),
maxSize = null,
),
Map::class.java),
)
}
pollingTasks += "Poll group info config" to async {
handleInfo(
response = SnodeAPI.sendBatchRequest(
snode,
closedGroupSessionId.hexString,
SnodeAPI.buildAuthenticatedRetrieveBatchRequest(
snode = snode,
auth = groupAuth,
namespace = Namespace.CLOSED_GROUP_INFO(),
maxSize = null,
),
Map::class.java),
)
}
pollingTasks += "Poll group members config" to async {
handleMembers(
SnodeAPI.sendBatchRequest(
snode,
closedGroupSessionId.hexString,
SnodeAPI.buildAuthenticatedRetrieveBatchRequest(
snode = snode,
auth = groupAuth,
namespace = Namespace.CLOSED_GROUP_MEMBERS(),
maxSize = null,
),
Map::class.java),
)
}
if (configHashesToExtends.isNotEmpty() && adminKey != null) {
pollingTasks += "Extend group config TTL" to async {
SnodeAPI.sendBatchRequest(
snode,
closedGroupSessionId.hexString,
SnodeAPI.buildAuthenticatedAlterTtlBatchRequest(
messageHashes = configHashesToExtends.toList(),
auth = groupAuth,
newExpiry = SnodeAPI.nowWithOffset + 14.days.inWholeMilliseconds,
extend = true
),
)
}
}
if (requiresSync) {
configFactoryProtocol.scheduleUpdate(Destination.ClosedGroup(closedGroupSessionId.hexString))
}
val errors = pollingTasks.mapNotNull { (name, task) ->
runCatching { task.await() }
.exceptionOrNull()
?.takeIf { it !is CancellationException }
?.let { RuntimeException("Error executing: $name", it) }
}
if (errors.isNotEmpty()) {
throw PollerException("Error polling closed group", errors)
}
POLL_INTERVAL // this might change in future
@@ -281,9 +241,8 @@ class ClosedGroupPoller(
}
}
private suspend fun handleRevoked(body: RawResponse, keys: GroupKeysConfig) {
private suspend fun handleRevoked(body: RawResponse) {
// This shouldn't ever return null at this point
val userSessionId = configFactoryProtocol.userSessionId()!!
val messages = body["messages"] as? List<*>
?: return Log.w("GroupPoller", "body didn't contain a list of messages")
messages.forEach { messageMap ->
@@ -305,7 +264,13 @@ class ClosedGroupPoller(
val message = decoded.decodeToString()
if (Sodium.KICKED_REGEX.matches(message)) {
val (sessionId, generation) = message.split("-")
if (sessionId == userSessionId.hexString && generation.toInt() >= keys.currentGeneration()) {
val currentKeysGeneration by lazy {
configFactoryProtocol.withGroupConfigs(closedGroupSessionId) {
it.groupKeys.currentGeneration()
}
}
if (sessionId == storage.getUserPublicKey() && generation.toInt() >= currentKeysGeneration) {
try {
groupManagerV2.handleKicked(closedGroupSessionId)
} catch (e: Exception) {
@@ -318,51 +283,51 @@ class ClosedGroupPoller(
}
}
private fun handleKeyPoll(response: RawResponse,
keysConfig: GroupKeysConfig,
infoConfig: GroupInfoConfig,
membersConfig: GroupMembersConfig) {
private fun handleKeyPoll(response: RawResponse) {
// get all the data to hash objects and process them
val allMessages = parseMessages(response)
if (ENABLE_LOGGING) Log.d("ClosedGroupPoller", "Total key messages this poll: ${allMessages.size}")
var total = 0
allMessages.forEach { (message, hash, timestamp) ->
if (keysConfig.loadKey(message, hash, timestamp, infoConfig, membersConfig)) {
total++
configFactoryProtocol.withMutableGroupConfigs(closedGroupSessionId) { configs ->
if (configs.loadKeys(message, hash, timestamp)) {
total++
}
}
if (ENABLE_LOGGING) Log.d("ClosedGroupPoller", "Merged $hash for keys on ${closedGroupSessionId.hexString}")
}
if (ENABLE_LOGGING) Log.d("ClosedGroupPoller", "Total key messages consumed: $total")
}
private fun handleInfo(response: RawResponse,
infoConfig: GroupInfoConfig) {
private fun handleInfo(response: RawResponse) {
val messages = parseMessages(response)
messages.forEach { (message, hash, _) ->
infoConfig.merge(hash to message)
configFactoryProtocol.withMutableGroupConfigs(closedGroupSessionId) { configs ->
configs.groupInfo.merge(arrayOf(hash to message))
}
if (ENABLE_LOGGING) Log.d("ClosedGroupPoller", "Merged $hash for info on ${closedGroupSessionId.hexString}")
}
if (messages.isNotEmpty()) {
val lastTimestamp = messages.maxOf { it.timestamp }
MessagingModuleConfiguration.shared.storage.notifyConfigUpdates(infoConfig, lastTimestamp)
}
}
private fun handleMembers(response: RawResponse,
membersConfig: GroupMembersConfig) {
private fun handleMembers(response: RawResponse) {
parseMessages(response).forEach { (message, hash, _) ->
membersConfig.merge(hash to message)
configFactoryProtocol.withMutableGroupConfigs(closedGroupSessionId) { configs ->
configs.groupMembers.merge(arrayOf(hash to message))
}
if (ENABLE_LOGGING) Log.d("ClosedGroupPoller", "Merged $hash for members on ${closedGroupSessionId.hexString}")
}
}
private fun handleMessages(body: RawResponse, snode: Snode, keysConfig: GroupKeysConfig) {
val messages = SnodeAPI.parseRawMessagesResponse(
rawResponse = body,
snode = snode,
publicKey = closedGroupSessionId.hexString,
decrypt = keysConfig::decrypt
)
private fun handleMessages(body: RawResponse, snode: Snode) {
val messages = configFactoryProtocol.withGroupConfigs(closedGroupSessionId) {
SnodeAPI.parseRawMessagesResponse(
rawResponse = body,
snode = snode,
publicKey = closedGroupSessionId.hexString,
decrypt = it.groupKeys::decrypt,
)
}
val parameters = messages.map { (envelope, serverHash) ->
MessageReceiveParameters(

View File

@@ -3,10 +3,17 @@ package org.session.libsession.messaging.sending_receiving.pollers
import android.util.SparseArray
import androidx.core.util.valueIterator
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.async
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.runBlocking
import network.loki.messenger.libsession_util.ConfigBase
import network.loki.messenger.libsession_util.Contacts
import network.loki.messenger.libsession_util.ConversationVolatileConfig
import network.loki.messenger.libsession_util.MutableConfig
import network.loki.messenger.libsession_util.MutableContacts
import network.loki.messenger.libsession_util.MutableConversationVolatileConfig
import network.loki.messenger.libsession_util.MutableUserGroupsConfig
import network.loki.messenger.libsession_util.MutableUserProfile
import network.loki.messenger.libsession_util.UserGroupsConfig
import network.loki.messenger.libsession_util.UserProfile
import nl.komponents.kovenant.Deferred
@@ -24,6 +31,8 @@ import org.session.libsession.snode.RawResponse
import org.session.libsession.snode.SnodeAPI
import org.session.libsession.snode.SnodeModule
import org.session.libsession.utilities.ConfigFactoryProtocol
import org.session.libsession.utilities.Contact.Name
import org.session.libsession.utilities.MutableGroupConfigs
import org.session.libsignal.utilities.Base64
import org.session.libsignal.utilities.Log
import org.session.libsignal.utilities.Namespace
@@ -135,9 +144,7 @@ class Poller(private val configFactory: ConfigFactoryProtocol) {
}
}
private fun processConfig(snode: Snode, rawMessages: RawResponse, namespace: Int, forConfigObject: ConfigBase?) {
if (forConfigObject == null) return
private fun processConfig(snode: Snode, rawMessages: RawResponse, namespace: Int, forConfig: Class<out MutableConfig>) {
val messages = rawMessages["messages"] as? List<*>
val processed = if (!messages.isNullOrEmpty()) {
SnodeAPI.updateLastMessageHashValueIfPossible(snode, userPublicKey, messages, namespace)
@@ -152,20 +159,19 @@ class Poller(private val configFactory: ConfigFactoryProtocol) {
if (processed.isEmpty()) return
var latestMessageTimestamp: Long? = null
processed.forEach { (body, hash, timestamp) ->
processed.forEach { (body, hash, _) ->
try {
forConfigObject.merge(hash to body)
latestMessageTimestamp = if (timestamp > (latestMessageTimestamp ?: 0L)) { timestamp } else { latestMessageTimestamp }
configFactory.withMutableUserConfigs { configs ->
configs
.allConfigs()
.filter { it.javaClass.isInstance(forConfig) }
.first()
.merge(arrayOf(hash to body))
}
} catch (e: Exception) {
Log.e(TAG, e)
}
}
// process new results
// latestMessageTimestamp should always be non-null if the config object needs dump
if (forConfigObject.needsDump() && latestMessageTimestamp != null) {
configFactory.persist(forConfigObject, latestMessageTimestamp ?: SnodeAPI.nowWithOffset)
}
}
private fun poll(userProfileOnly: Boolean, snode: Snode, deferred: Deferred<Unit, Exception>): Promise<Unit, Exception> {
@@ -181,7 +187,8 @@ class Poller(private val configFactory: ConfigFactoryProtocol) {
val hashesToExtend = mutableSetOf<String>()
val userAuth = requireNotNull(MessagingModuleConfiguration.shared.storage.userAuth)
configFactory.user?.let { config ->
configFactory.withUserConfigs {
val config = it.userProfile
hashesToExtend += config.currentHashes()
SnodeAPI.buildAuthenticatedRetrieveBatchRequest(
snode = snode,
@@ -189,7 +196,7 @@ class Poller(private val configFactory: ConfigFactoryProtocol) {
namespace = config.namespace(),
maxSize = -8
)
}?.let { request ->
}.let { request ->
requests += request
}
@@ -199,7 +206,7 @@ class Poller(private val configFactory: ConfigFactoryProtocol) {
auth = userAuth,
newExpiry = SnodeAPI.nowWithOffset + 14.days.inWholeMilliseconds,
extend = true
)?.let { extensionRequest ->
).let { extensionRequest ->
requests += extensionRequest
}
}
@@ -217,7 +224,7 @@ class Poller(private val configFactory: ConfigFactoryProtocol) {
if (body == null) {
Log.e(TAG, "Batch sub-request didn't contain a body")
} else {
processConfig(snode, body, configFactory.user!!.namespace(), configFactory.user)
processConfig(snode, body, Namespace.USER_PROFILE(), MutableUserProfile::class.java)
}
}
}
@@ -230,6 +237,7 @@ class Poller(private val configFactory: ConfigFactoryProtocol) {
}
}
private fun poll(snode: Snode, deferred: Deferred<Unit, Exception>): Promise<Unit, Exception> {
if (!hasStarted) { return Promise.ofFail(PromiseCanceledException()) }
return task {
@@ -244,17 +252,19 @@ class Poller(private val configFactory: ConfigFactoryProtocol) {
}
// get the latest convo info volatile
val hashesToExtend = mutableSetOf<String>()
configFactory.getUserConfigs().map { config ->
hashesToExtend += config.currentHashes()
SnodeAPI.buildAuthenticatedRetrieveBatchRequest(
snode = snode,
auth = userAuth,
namespace = config.namespace(),
maxSize = -8
)
}.forEach { request ->
configFactory.withUserConfigs {
it.allConfigs().map { config ->
hashesToExtend += config.currentHashes()
config.namespace() to SnodeAPI.buildAuthenticatedRetrieveBatchRequest(
snode = snode,
auth = userAuth,
namespace = config.namespace(),
maxSize = -8
)
}
}.forEach { (namespace, request) ->
// namespaces here should always be set
requestSparseArray[request.namespace!!] = request
requestSparseArray[namespace] = request
}
val requests =
@@ -266,7 +276,7 @@ class Poller(private val configFactory: ConfigFactoryProtocol) {
auth = userAuth,
newExpiry = SnodeAPI.nowWithOffset + 14.days.inWholeMilliseconds,
extend = true
)?.let { extensionRequest ->
).let { extensionRequest ->
requests += extensionRequest
}
}
@@ -280,14 +290,15 @@ class Poller(private val configFactory: ConfigFactoryProtocol) {
val responseList = (rawResponses["results"] as List<RawResponse>)
// in case we had null configs, the array won't be fully populated
// index of the sparse array key iterator should be the request index, with the key being the namespace
listOfNotNull(
configFactory.user?.namespace(),
configFactory.contacts?.namespace(),
configFactory.userGroups?.namespace(),
configFactory.convoVolatile?.namespace()
).map {
it to requestSparseArray.indexOfKey(it)
}.filter { (_, i) -> i >= 0 }.forEach { (key, requestIndex) ->
sequenceOf(
Namespace.USER_PROFILE() to MutableUserProfile::class.java,
Namespace.CONTACTS() to MutableContacts::class.java,
Namespace.GROUPS() to MutableUserGroupsConfig::class.java,
Namespace.CONVO_INFO_VOLATILE() to MutableConversationVolatileConfig::class.java
).map { (namespace, configClass) ->
Triple(namespace, configClass, requestSparseArray.indexOfKey(namespace))
}.filter { (_, _, i) -> i >= 0 }
.forEach { (namespace, configClass, requestIndex) ->
responseList.getOrNull(requestIndex)?.let { rawResponse ->
if (rawResponse["code"] as? Int != 200) {
Log.e(TAG, "Batch sub-request had non-200 response code, returned code ${(rawResponse["code"] as? Int) ?: "[unknown]"}")
@@ -298,16 +309,8 @@ class Poller(private val configFactory: ConfigFactoryProtocol) {
Log.e(TAG, "Batch sub-request didn't contain a body")
return@forEach
}
if (key == Namespace.DEFAULT()) {
return@forEach // continue, skip default namespace
} else {
when (ConfigBase.kindFor(key)) {
UserProfile::class.java -> processConfig(snode, body, key, configFactory.user)
Contacts::class.java -> processConfig(snode, body, key, configFactory.contacts)
ConversationVolatileConfig::class.java -> processConfig(snode, body, key, configFactory.convoVolatile)
UserGroupsConfig::class.java -> processConfig(snode, body, key, configFactory.userGroups)
}
}
processConfig(snode, body, namespace, configClass)
}
}

View File

@@ -1,39 +0,0 @@
package org.session.libsession.snode
import network.loki.messenger.libsession_util.GroupKeysConfig
import org.session.libsignal.utilities.AccountId
/**
* A [SwarmAuth] that signs message using a group's subaccount. This should be used for non-admin
* users of a group signing their messages.
*/
class GroupSubAccountSwarmAuth(
private val groupKeysConfig: GroupKeysConfig,
override val accountId: AccountId,
private val authData: ByteArray
) : SwarmAuth {
override val ed25519PublicKeyHex: String? get() = null
init {
check(authData.size == 100) {
"Invalid auth data size, expecting 100 but got ${authData.size}"
}
}
override fun sign(data: ByteArray): Map<String, String> {
val auth = groupKeysConfig.subAccountSign(data, authData)
return buildMap {
put("subaccount", auth.subAccount)
put("subaccount_sig", auth.subAccountSig)
put("signature", auth.signature)
}
}
override fun signForPushRegistry(data: ByteArray): Map<String, String> {
val auth = groupKeysConfig.subAccountSign(data, authData)
return buildMap {
put("subkey_tag", auth.subAccount)
put("signature", auth.signature)
}
}
}

View File

@@ -579,7 +579,8 @@ object SnodeAPI {
}
private data class RequestInfo(
val accountId: AccountId,
val snode: Snode,
val publicKey: String,
val request: SnodeBatchRequestInfo,
val responseType: Class<*>,
val callback: SendChannel<Result<Any>>,
@@ -594,15 +595,17 @@ object SnodeAPI {
val batchWindowMills = 100L
data class BatchKey(val snodeAddress: String, val publicKey: String)
@Suppress("OPT_IN_USAGE")
GlobalScope.launch {
val batches = hashMapOf<AccountId, MutableList<RequestInfo>>()
val batches = hashMapOf<BatchKey, MutableList<RequestInfo>>()
while (true) {
val batch = select<List<RequestInfo>?> {
// If we receive a request, add it to the batch
batchRequests.onReceive {
batches.getOrPut(it.accountId) { mutableListOf() }.add(it)
batches.getOrPut(BatchKey(it.snode.address, it.publicKey)) { mutableListOf() }.add(it)
null
}
@@ -622,11 +625,11 @@ object SnodeAPI {
if (batch != null) {
launch batch@{
val accountId = batch.first().accountId
val snode = batch.first().snode
val responses = try {
getBatchResponse(
snode = getSingleTargetSnode(accountId.hexString).await(),
publicKey = accountId.hexString,
snode = snode,
publicKey = batch.first().publicKey,
requests = batch.map { it.request }, sequence = false
)
} catch (e: Exception) {
@@ -660,21 +663,23 @@ object SnodeAPI {
}
suspend fun <T> sendBatchRequest(
swarmAccount: AccountId,
snode: Snode,
publicKey: String,
request: SnodeBatchRequestInfo,
responseType: Class<T>,
): T {
val callback = Channel<Result<T>>()
@Suppress("UNCHECKED_CAST")
batchedRequestsSender.send(RequestInfo(swarmAccount, request, responseType, callback as SendChannel<Any>))
batchedRequestsSender.send(RequestInfo(snode, publicKey, request, responseType, callback as SendChannel<Any>))
return callback.receive().getOrThrow()
}
suspend fun sendBatchRequest(
swarmAccount: AccountId,
snode: Snode,
publicKey: String,
request: SnodeBatchRequestInfo,
): JsonNode {
return sendBatchRequest(swarmAccount, request, JsonNode::class.java)
return sendBatchRequest(snode, publicKey, request, JsonNode::class.java)
}
suspend fun getBatchResponse(
@@ -803,9 +808,9 @@ object SnodeAPI {
}
return scope.retrySuspendAsPromise(maxRetryCount) {
val destination = message.recipient
sendBatchRequest(
swarmAccount = AccountId(destination),
snode = getSingleTargetSnode(message.recipient).await(),
publicKey = message.recipient,
request = SnodeBatchRequestInfo(
method = Snode.Method.SendMessage.rawValue,
params = params,

View File

@@ -10,5 +10,8 @@ data class BatchResponse @JsonCreator constructor(
data class Item @JsonCreator constructor(
@param:JsonProperty("code") val code: Int,
@param:JsonProperty("body") val body: JsonNode,
)
) {
val isSuccessful: Boolean
get() = code in 200..299
}
}

View File

@@ -1,82 +1,109 @@
package org.session.libsession.utilities
import kotlinx.coroutines.flow.Flow
import network.loki.messenger.libsession_util.Config
import network.loki.messenger.libsession_util.ConfigBase
import network.loki.messenger.libsession_util.Contacts
import network.loki.messenger.libsession_util.ConversationVolatileConfig
import network.loki.messenger.libsession_util.GroupInfoConfig
import network.loki.messenger.libsession_util.GroupKeysConfig
import network.loki.messenger.libsession_util.GroupMembersConfig
import network.loki.messenger.libsession_util.UserGroupsConfig
import network.loki.messenger.libsession_util.UserProfile
import org.session.libsession.messaging.messages.Destination
import network.loki.messenger.libsession_util.MutableConfig
import network.loki.messenger.libsession_util.MutableContacts
import network.loki.messenger.libsession_util.MutableConversationVolatileConfig
import network.loki.messenger.libsession_util.MutableGroupInfoConfig
import network.loki.messenger.libsession_util.MutableGroupKeysConfig
import network.loki.messenger.libsession_util.MutableGroupMembersConfig
import network.loki.messenger.libsession_util.MutableUserGroupsConfig
import network.loki.messenger.libsession_util.MutableUserProfile
import network.loki.messenger.libsession_util.ReadableConfig
import network.loki.messenger.libsession_util.ReadableContacts
import network.loki.messenger.libsession_util.ReadableConversationVolatileConfig
import network.loki.messenger.libsession_util.ReadableGroupInfoConfig
import network.loki.messenger.libsession_util.ReadableGroupKeysConfig
import network.loki.messenger.libsession_util.ReadableGroupMembersConfig
import network.loki.messenger.libsession_util.ReadableUserGroupsConfig
import network.loki.messenger.libsession_util.ReadableUserProfile
import org.session.libsession.snode.SwarmAuth
import org.session.libsignal.utilities.AccountId
interface ConfigFactoryProtocol {
val configUpdateNotifications: Flow<ConfigUpdateNotification>
val user: UserProfile?
val contacts: Contacts?
val convoVolatile: ConversationVolatileConfig?
val userGroups: UserGroupsConfig?
fun <T> withUserConfigs(cb: (UserConfigs) -> T): T
fun <T> withMutableUserConfigs(cb: (MutableUserConfigs) -> T): T
val configUpdateNotifications: Flow<Unit>
fun getGroupInfoConfig(groupSessionId: AccountId): GroupInfoConfig?
fun getGroupMemberConfig(groupSessionId: AccountId): GroupMembersConfig?
fun getGroupKeysConfig(groupSessionId: AccountId,
info: GroupInfoConfig? = null,
members: GroupMembersConfig? = null,
free: Boolean = true): GroupKeysConfig?
fun getUserConfigs(): List<ConfigBase>
fun persist(forConfigObject: Config, timestamp: Long, forPublicKey: String? = null)
fun <T> withGroupConfigs(groupId: AccountId, cb: (GroupConfigs) -> T): T
fun <T> withMutableGroupConfigs(groupId: AccountId, cb: (MutableGroupConfigs) -> T): T
fun conversationInConfig(publicKey: String?, groupPublicKey: String?, openGroupId: String?, visibleOnly: Boolean): Boolean
fun canPerformChange(variant: String, publicKey: String, changeTimestampMs: Long): Boolean
fun saveGroupConfigs(
groupKeys: GroupKeysConfig,
groupInfo: GroupInfoConfig,
groupMembers: GroupMembersConfig
)
fun removeGroup(closedGroupId: AccountId)
fun scheduleUpdate(destination: Destination)
fun constructGroupKeysConfig(
groupSessionId: AccountId,
info: GroupInfoConfig,
members: GroupMembersConfig
): GroupKeysConfig?
fun getGroupAuth(groupId: AccountId): SwarmAuth?
fun removeGroup(groupId: AccountId)
fun maybeDecryptForUser(encoded: ByteArray,
domain: String,
closedGroupSessionId: AccountId): ByteArray?
fun userSessionId(): AccountId?
}
interface ConfigFactoryUpdateListener {
fun notifyUpdates(forConfigObject: Config, messageTimestamp: Long)
interface UserConfigs {
val contacts: ReadableContacts
val userGroups: ReadableUserGroupsConfig
val userProfile: ReadableUserProfile
val convoInfoVolatile: ReadableConversationVolatileConfig
fun allConfigs(): Sequence<ReadableConfig> = sequenceOf(contacts, userGroups, userProfile, convoInfoVolatile)
}
/**
* Access group configs if they exist, otherwise return null.
*
* Note: The config objects will be closed after the callback is executed. Any attempt
* to store the config objects will result in a native crash.
*/
inline fun <T: Any> ConfigFactoryProtocol.withGroupConfigsOrNull(
groupId: AccountId,
cb: (GroupInfoConfig, GroupMembersConfig, GroupKeysConfig) -> T
): T? {
getGroupInfoConfig(groupId)?.use { groupInfo ->
getGroupMemberConfig(groupId)?.use { groupMembers ->
getGroupKeysConfig(groupId, groupInfo, groupMembers)?.use { groupKeys ->
return cb(groupInfo, groupMembers, groupKeys)
}
}
}
interface MutableUserConfigs : UserConfigs {
override val contacts: MutableContacts
override val userGroups: MutableUserGroupsConfig
override val userProfile: MutableUserProfile
override val convoInfoVolatile: MutableConversationVolatileConfig
return null
}
override fun allConfigs(): Sequence<MutableConfig> = sequenceOf(contacts, userGroups, userProfile, convoInfoVolatile)
}
interface GroupConfigs {
val groupInfo: ReadableGroupInfoConfig
val groupMembers: ReadableGroupMembersConfig
val groupKeys: ReadableGroupKeysConfig
}
interface MutableGroupConfigs : GroupConfigs {
override val groupInfo: MutableGroupInfoConfig
override val groupMembers: MutableGroupMembersConfig
override val groupKeys: MutableGroupKeysConfig
fun loadKeys(message: ByteArray, hash: String, timestamp: Long): Boolean
fun rekeys()
}
sealed interface ConfigUpdateNotification {
data object UserConfigs : ConfigUpdateNotification
data class GroupConfigsUpdated(val groupId: AccountId) : ConfigUpdateNotification
data class GroupConfigsDeleted(val groupId: AccountId) : ConfigUpdateNotification
}
//interface ConfigFactoryUpdateListener {
// fun notifyUpdates(forConfigObject: Config, messageTimestamp: Long)
//}
///**
// * Access group configs if they exist, otherwise return null.
// *
// * Note: The config objects will be closed after the callback is executed. Any attempt
// * to store the config objects will result in a native crash.
// */
//inline fun <T: Any> ConfigFactoryProtocol.withGroupConfigsOrNull(
// groupId: AccountId,
// cb: (GroupInfoConfig, GroupMembersConfig, GroupKeysConfig) -> T
//): T? {
// getGroupInfoConfig(groupId)?.use { groupInfo ->
// getGroupMemberConfig(groupId)?.use { groupMembers ->
// getGroupKeysConfig(groupId, groupInfo, groupMembers)?.use { groupKeys ->
// return cb(groupInfo, groupMembers, groupKeys)
// }
// }
// }
//
// return null
//}