Group keys and message response improvement

This commit is contained in:
SessionHero01 2024-10-04 10:20:16 +10:00
parent 08ee07f5c5
commit 8dd2f364ce
No known key found for this signature in database
12 changed files with 257 additions and 211 deletions

View File

@ -27,7 +27,7 @@ import network.loki.messenger.libsession_util.util.UserPic
import org.session.libsession.database.StorageProtocol import org.session.libsession.database.StorageProtocol
import org.session.libsession.messaging.messages.control.ConfigurationMessage import org.session.libsession.messaging.messages.control.ConfigurationMessage
import org.session.libsession.snode.OwnedSwarmAuth import org.session.libsession.snode.OwnedSwarmAuth
import org.session.libsession.snode.SnodeAPI import org.session.libsession.snode.SnodeClock
import org.session.libsession.snode.SwarmAuth import org.session.libsession.snode.SwarmAuth
import org.session.libsession.utilities.Address import org.session.libsession.utilities.Address
import org.session.libsession.utilities.ConfigFactoryProtocol import org.session.libsession.utilities.ConfigFactoryProtocol
@ -52,7 +52,6 @@ import org.thoughtcrime.securesms.database.ConfigDatabase
import org.thoughtcrime.securesms.database.LokiThreadDatabase import org.thoughtcrime.securesms.database.LokiThreadDatabase
import org.thoughtcrime.securesms.database.ThreadDatabase import org.thoughtcrime.securesms.database.ThreadDatabase
import org.thoughtcrime.securesms.groups.GroupManager import org.thoughtcrime.securesms.groups.GroupManager
import java.util.concurrent.ConcurrentHashMap
import javax.inject.Inject import javax.inject.Inject
import javax.inject.Singleton import javax.inject.Singleton
@ -64,163 +63,23 @@ class ConfigFactory @Inject constructor(
private val threadDb: ThreadDatabase, private val threadDb: ThreadDatabase,
private val lokiThreadDatabase: LokiThreadDatabase, private val lokiThreadDatabase: LokiThreadDatabase,
private val storage: Lazy<StorageProtocol>, private val storage: Lazy<StorageProtocol>,
private val textSecurePreferences: TextSecurePreferences private val textSecurePreferences: TextSecurePreferences,
private val clock: SnodeClock,
) : ConfigFactoryProtocol { ) : ConfigFactoryProtocol {
companion object { companion object {
// This is a buffer period within which we will process messages which would result in a // This is a buffer period within which we will process messages which would result in a
// config change, any message which would normally result in a config change which was sent // config change, any message which would normally result in a config change which was sent
// before `lastConfigMessage.timestamp - configChangeBufferPeriod` will not actually have // before `lastConfigMessage.timestamp - configChangeBufferPeriod` will not actually have
// it's changes applied (control text will still be added though) // it's changes applied (control text will still be added though)
const val configChangeBufferPeriod: Long = (2 * 60 * 1000) private const val CONFIG_CHANGE_BUFFER_PERIOD: Long = 2 * 60 * 1000L
} }
init { init {
System.loadLibrary("session_util") System.loadLibrary("session_util")
} }
private class UserConfigsImpl( private val userConfigs = HashMap<AccountId, UserConfigsImpl>()
userEd25519SecKey: ByteArray, private val groupConfigs = HashMap<AccountId, GroupConfigsImpl>()
private val userAccountId: AccountId,
private val configDatabase: ConfigDatabase,
storage: StorageProtocol,
threadDb: ThreadDatabase,
contactsDump: ByteArray? = configDatabase.retrieveConfigAndHashes(
ConfigDatabase.CONTACTS_VARIANT,
userAccountId.hexString
),
userGroupsDump: ByteArray? = configDatabase.retrieveConfigAndHashes(
ConfigDatabase.USER_GROUPS_VARIANT,
userAccountId.hexString
),
userProfileDump: ByteArray? = configDatabase.retrieveConfigAndHashes(
ConfigDatabase.USER_PROFILE_VARIANT,
userAccountId.hexString
),
convoInfoDump: ByteArray? = configDatabase.retrieveConfigAndHashes(
ConfigDatabase.CONVO_INFO_VARIANT,
userAccountId.hexString
)
) : MutableUserConfigs {
override val contacts = Contacts(
ed25519SecretKey = userEd25519SecKey,
initialDump = contactsDump,
)
override val userGroups = UserGroupsConfig(
ed25519SecretKey = userEd25519SecKey,
initialDump = userGroupsDump
)
override val userProfile = UserProfile(
ed25519SecretKey = userEd25519SecKey,
initialDump = userProfileDump
)
override val convoInfoVolatile = ConversationVolatileConfig(
ed25519SecretKey = userEd25519SecKey,
initialDump = convoInfoDump,
)
init {
if (contactsDump == null) {
contacts.initFrom(storage)
}
if (userGroupsDump == null) {
userGroups.initFrom(storage)
}
if (userProfileDump == null) {
userProfile.initFrom(storage)
}
if (convoInfoDump == null) {
convoInfoVolatile.initFrom(storage, threadDb)
}
}
/**
* Persists the config if it is dirty and returns the list of classes that were persisted
*/
fun persistIfDirty(): Boolean {
return sequenceOf(
contacts to ConfigDatabase.CONTACTS_VARIANT,
userGroups to ConfigDatabase.USER_GROUPS_VARIANT,
userProfile to ConfigDatabase.USER_PROFILE_VARIANT,
convoInfoVolatile to ConfigDatabase.CONVO_INFO_VARIANT
).fold(false) { acc, (config, variant) ->
if (config.needsDump()) {
configDatabase.storeConfig(
variant = variant,
publicKey = userAccountId.hexString,
data = config.dump(),
timestamp = SnodeAPI.nowWithOffset
)
true
} else {
acc
}
}
}
}
private class GroupConfigsImpl(
userEd25519SecKey: ByteArray,
private val groupAccountId: AccountId,
groupAdminKey: ByteArray?,
private val configDatabase: ConfigDatabase
) : MutableGroupConfigs {
override val groupInfo = GroupInfoConfig(
groupPubKey = groupAccountId.pubKeyBytes,
groupAdminKey = groupAdminKey,
initialDump = configDatabase.retrieveConfigAndHashes(
ConfigDatabase.INFO_VARIANT,
groupAccountId.hexString
)
)
override val groupMembers = GroupMembersConfig(
groupPubKey = groupAccountId.pubKeyBytes,
groupAdminKey = groupAdminKey,
initialDump = configDatabase.retrieveConfigAndHashes(
ConfigDatabase.MEMBER_VARIANT,
groupAccountId.hexString
)
)
override val groupKeys = GroupKeysConfig(
userSecretKey = userEd25519SecKey,
groupPublicKey = groupAccountId.pubKeyBytes,
groupAdminKey = groupAdminKey,
initialDump = configDatabase.retrieveConfigAndHashes(
ConfigDatabase.KEYS_VARIANT,
groupAccountId.hexString
),
info = groupInfo,
members = groupMembers
)
fun dumpIfNeeded(): Boolean {
if (groupInfo.needsDump() || groupMembers.needsDump() || groupKeys.needsDump()) {
configDatabase.storeGroupConfigs(
publicKey = groupAccountId.hexString,
keysConfig = groupKeys.dump(),
infoConfig = groupInfo.dump(),
memberConfig = groupMembers.dump(),
timestamp = SnodeAPI.nowWithOffset
)
return true
}
return false
}
val isDirty: Boolean
get() = groupInfo.dirty() || groupMembers.dirty()
override fun rekey() {
groupKeys.rekey(groupInfo.pointer, groupMembers.pointer)
}
}
private val userConfigs = ConcurrentHashMap<AccountId, UserConfigsImpl>()
private val groupConfigs = ConcurrentHashMap<AccountId, GroupConfigsImpl>()
private val _configUpdateNotifications = MutableSharedFlow<ConfigUpdateNotification>( private val _configUpdateNotifications = MutableSharedFlow<ConfigUpdateNotification>(
extraBufferCapacity = 5, // The notifications are normally important so we can afford to buffer a few extraBufferCapacity = 5, // The notifications are normally important so we can afford to buffer a few
@ -240,14 +99,16 @@ class ConfigFactory @Inject constructor(
override fun <T> withUserConfigs(cb: (UserConfigs) -> T): T { override fun <T> withUserConfigs(cb: (UserConfigs) -> T): T {
val userAccountId = requiresCurrentUserAccountId() val userAccountId = requiresCurrentUserAccountId()
val configs = userConfigs.getOrPut(userAccountId) { val configs = synchronized(userConfigs) {
UserConfigsImpl( userConfigs.getOrPut(userAccountId) {
requiresCurrentUserED25519SecKey(), UserConfigsImpl(
userAccountId, requiresCurrentUserED25519SecKey(),
threadDb = threadDb, userAccountId,
configDatabase = configDatabase, threadDb = threadDb,
storage = storage.get() configDatabase = configDatabase,
) storage = storage.get()
)
}
} }
return synchronized(configs) { return synchronized(configs) {
@ -294,20 +155,27 @@ class ConfigFactory @Inject constructor(
override fun <T> withMutableUserConfigs(cb: (MutableUserConfigs) -> T): T { override fun <T> withMutableUserConfigs(cb: (MutableUserConfigs) -> T): T {
return doWithMutableUserConfigs { return doWithMutableUserConfigs {
cb(it) to it.persistIfDirty() cb(it) to it.persistIfDirty(clock)
} }
} }
override fun <T> withGroupConfigs(groupId: AccountId, cb: (GroupConfigs) -> T): T { override fun <T> withGroupConfigs(groupId: AccountId, cb: (GroupConfigs) -> T): T {
val configs = groupConfigs.getOrPut(groupId) { val groupAdminKey = getClosedGroup(groupId)?.adminKey
val groupAdminKey = getClosedGroup(groupId)?.adminKey
GroupConfigsImpl( val configs = synchronized(groupConfigs) {
requiresCurrentUserED25519SecKey(), var value = groupConfigs[groupId]
groupId, if (value == null || value.groupKeys.admin() != (groupAdminKey != null)) {
groupAdminKey, // No existing configs or existing configs have different admin settings with what we currently have
configDatabase // Create a new group configs
) value = GroupConfigsImpl(
requiresCurrentUserED25519SecKey(),
groupId,
groupAdminKey,
configDatabase
).also { groupConfigs[groupId] = it }
}
value
} }
return synchronized(configs) { return synchronized(configs) {
@ -342,7 +210,7 @@ class ConfigFactory @Inject constructor(
cb: (MutableGroupConfigs) -> T cb: (MutableGroupConfigs) -> T
): T { ): T {
return doWithMutableGroupConfigs(recreateConfigInstances = recreateConfigInstances, groupId = groupId) { return doWithMutableGroupConfigs(recreateConfigInstances = recreateConfigInstances, groupId = groupId) {
cb(it) to it.dumpIfNeeded() cb(it) to it.dumpIfNeeded(clock)
} }
} }
@ -359,7 +227,7 @@ class ConfigFactory @Inject constructor(
configDatabase.deleteGroupConfigs(groupId) configDatabase.deleteGroupConfigs(groupId)
} }
override fun maybeDecryptForUser( override fun decryptForUser(
encoded: ByteArray, encoded: ByteArray,
domain: String, domain: String,
closedGroupSessionId: AccountId closedGroupSessionId: AccountId
@ -392,7 +260,7 @@ class ConfigFactory @Inject constructor(
val membersMerged = members.isNotEmpty() && val membersMerged = members.isNotEmpty() &&
configs.groupMembers.merge(members.map { it.hash to it.data }.toTypedArray()).isNotEmpty() configs.groupMembers.merge(members.map { it.hash to it.data }.toTypedArray()).isNotEmpty()
configs.dumpIfNeeded() configs.dumpIfNeeded(clock)
Unit to (keysLoaded || infoMerged || membersMerged) Unit to (keysLoaded || infoMerged || membersMerged)
} }
@ -414,7 +282,7 @@ class ConfigFactory @Inject constructor(
convoInfoVolatile?.let { (push, result) -> configs.convoInfoVolatile.confirmPushed(push.seqNo, result.hash) } convoInfoVolatile?.let { (push, result) -> configs.convoInfoVolatile.confirmPushed(push.seqNo, result.hash) }
userGroups?.let { (push, result) -> configs.userGroups.confirmPushed(push.seqNo, result.hash) } userGroups?.let { (push, result) -> configs.userGroups.confirmPushed(push.seqNo, result.hash) }
Unit to configs.persistIfDirty() Unit to configs.persistIfDirty(clock)
} }
} }
@ -438,7 +306,7 @@ class ConfigFactory @Inject constructor(
} }
} }
Unit to configs.dumpIfNeeded() Unit to configs.dumpIfNeeded(clock)
} }
} }
@ -489,7 +357,7 @@ class ConfigFactory @Inject constructor(
configDatabase.retrieveConfigLastUpdateTimestamp(variant, publicKey) configDatabase.retrieveConfigLastUpdateTimestamp(variant, publicKey)
// Ensure the change occurred after the last config message was handled (minus the buffer period) // Ensure the change occurred after the last config message was handled (minus the buffer period)
return (changeTimestampMs >= (lastUpdateTimestampMs - configChangeBufferPeriod)) return (changeTimestampMs >= (lastUpdateTimestampMs - CONFIG_CHANGE_BUFFER_PERIOD))
} }
override fun getGroupAuth(groupId: AccountId): SwarmAuth? { override fun getGroupAuth(groupId: AccountId): SwarmAuth? {
@ -681,3 +549,141 @@ private fun MutableContacts.initFrom(storage: StorageProtocol) {
set(contactInfo) set(contactInfo)
} }
} }
private class UserConfigsImpl(
userEd25519SecKey: ByteArray,
private val userAccountId: AccountId,
private val configDatabase: ConfigDatabase,
storage: StorageProtocol,
threadDb: ThreadDatabase,
contactsDump: ByteArray? = configDatabase.retrieveConfigAndHashes(
ConfigDatabase.CONTACTS_VARIANT,
userAccountId.hexString
),
userGroupsDump: ByteArray? = configDatabase.retrieveConfigAndHashes(
ConfigDatabase.USER_GROUPS_VARIANT,
userAccountId.hexString
),
userProfileDump: ByteArray? = configDatabase.retrieveConfigAndHashes(
ConfigDatabase.USER_PROFILE_VARIANT,
userAccountId.hexString
),
convoInfoDump: ByteArray? = configDatabase.retrieveConfigAndHashes(
ConfigDatabase.CONVO_INFO_VARIANT,
userAccountId.hexString
)
) : MutableUserConfigs {
override val contacts = Contacts(
ed25519SecretKey = userEd25519SecKey,
initialDump = contactsDump,
)
override val userGroups = UserGroupsConfig(
ed25519SecretKey = userEd25519SecKey,
initialDump = userGroupsDump
)
override val userProfile = UserProfile(
ed25519SecretKey = userEd25519SecKey,
initialDump = userProfileDump
)
override val convoInfoVolatile = ConversationVolatileConfig(
ed25519SecretKey = userEd25519SecKey,
initialDump = convoInfoDump,
)
init {
if (contactsDump == null) {
contacts.initFrom(storage)
}
if (userGroupsDump == null) {
userGroups.initFrom(storage)
}
if (userProfileDump == null) {
userProfile.initFrom(storage)
}
if (convoInfoDump == null) {
convoInfoVolatile.initFrom(storage, threadDb)
}
}
/**
* Persists the config if it is dirty and returns the list of classes that were persisted
*/
fun persistIfDirty(clock: SnodeClock): Boolean {
return sequenceOf(
contacts to ConfigDatabase.CONTACTS_VARIANT,
userGroups to ConfigDatabase.USER_GROUPS_VARIANT,
userProfile to ConfigDatabase.USER_PROFILE_VARIANT,
convoInfoVolatile to ConfigDatabase.CONVO_INFO_VARIANT
).fold(false) { acc, (config, variant) ->
if (config.needsDump()) {
configDatabase.storeConfig(
variant = variant,
publicKey = userAccountId.hexString,
data = config.dump(),
timestamp = clock.currentTimeMills()
)
true
} else {
acc
}
}
}
}
private class GroupConfigsImpl(
userEd25519SecKey: ByteArray,
private val groupAccountId: AccountId,
groupAdminKey: ByteArray?,
private val configDatabase: ConfigDatabase
) : MutableGroupConfigs {
override val groupInfo = GroupInfoConfig(
groupPubKey = groupAccountId.pubKeyBytes,
groupAdminKey = groupAdminKey,
initialDump = configDatabase.retrieveConfigAndHashes(
ConfigDatabase.INFO_VARIANT,
groupAccountId.hexString
)
)
override val groupMembers = GroupMembersConfig(
groupPubKey = groupAccountId.pubKeyBytes,
groupAdminKey = groupAdminKey,
initialDump = configDatabase.retrieveConfigAndHashes(
ConfigDatabase.MEMBER_VARIANT,
groupAccountId.hexString
)
)
override val groupKeys = GroupKeysConfig(
userSecretKey = userEd25519SecKey,
groupPublicKey = groupAccountId.pubKeyBytes,
groupAdminKey = groupAdminKey,
initialDump = configDatabase.retrieveConfigAndHashes(
ConfigDatabase.KEYS_VARIANT,
groupAccountId.hexString
),
info = groupInfo,
members = groupMembers
)
fun dumpIfNeeded(clock: SnodeClock): Boolean {
if (groupInfo.needsDump() || groupMembers.needsDump() || groupKeys.needsDump()) {
configDatabase.storeGroupConfigs(
publicKey = groupAccountId.hexString,
keysConfig = groupKeys.dump(),
infoConfig = groupInfo.dump(),
memberConfig = groupMembers.dump(),
timestamp = clock.currentTimeMills()
)
return true
}
return false
}
override fun rekey() {
groupKeys.rekey(groupInfo.pointer, groupMembers.pointer)
}
}

View File

@ -7,6 +7,7 @@ import network.loki.messenger.libsession_util.util.GroupInfo
import org.session.libsession.database.StorageProtocol import org.session.libsession.database.StorageProtocol
import org.session.libsession.messaging.groups.GroupManagerV2 import org.session.libsession.messaging.groups.GroupManagerV2
import org.session.libsession.messaging.sending_receiving.pollers.ClosedGroupPoller import org.session.libsession.messaging.sending_receiving.pollers.ClosedGroupPoller
import org.session.libsession.snode.SnodeClock
import org.session.libsignal.database.LokiAPIDatabaseProtocol import org.session.libsignal.database.LokiAPIDatabaseProtocol
import org.session.libsignal.utilities.AccountId import org.session.libsignal.utilities.AccountId
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
@ -18,6 +19,7 @@ class PollerFactory(
private val groupManagerV2: Lazy<GroupManagerV2>, private val groupManagerV2: Lazy<GroupManagerV2>,
private val storage: Lazy<StorageProtocol>, private val storage: Lazy<StorageProtocol>,
private val lokiApiDatabase: LokiAPIDatabaseProtocol, private val lokiApiDatabase: LokiAPIDatabaseProtocol,
private val clock: SnodeClock,
) { ) {
private val pollers = ConcurrentHashMap<AccountId, ClosedGroupPoller>() private val pollers = ConcurrentHashMap<AccountId, ClosedGroupPoller>()
@ -39,6 +41,7 @@ class PollerFactory(
groupManagerV2 = groupManagerV2.get(), groupManagerV2 = groupManagerV2.get(),
storage = storage.get(), storage = storage.get(),
lokiApiDatabase = lokiApiDatabase, lokiApiDatabase = lokiApiDatabase,
clock = clock,
) )
} }
} }

View File

@ -45,13 +45,15 @@ object SessionUtilModule {
configFactory: ConfigFactory, configFactory: ConfigFactory,
storage: Lazy<StorageProtocol>, storage: Lazy<StorageProtocol>,
groupManagerV2: Lazy<GroupManagerV2>, groupManagerV2: Lazy<GroupManagerV2>,
lokiApiDatabase: LokiAPIDatabaseProtocol) = PollerFactory( lokiApiDatabase: LokiAPIDatabaseProtocol,
clock: SnodeClock) = PollerFactory(
scope = coroutineScope, scope = coroutineScope,
executor = dispatcher, executor = dispatcher,
configFactory = configFactory, configFactory = configFactory,
groupManagerV2 = groupManagerV2, groupManagerV2 = groupManagerV2,
storage = storage, storage = storage,
lokiApiDatabase = lokiApiDatabase, lokiApiDatabase = lokiApiDatabase,
clock = clock,
) )
@Provides @Provides

View File

@ -554,7 +554,8 @@ class GroupManagerV2Impl @Inject constructor(
// this will fail the first couple of times :) // this will fail the first couple of times :)
MessageSender.send( MessageSender.send(
responseMessage, responseMessage,
Address.fromSerialized(group.groupAccountId.hexString) Destination.ClosedGroup(group.groupAccountId.hexString),
isSyncMessage = false
) )
} else { } else {
// If we are invited as admin, we can just update the group info ourselves // If we are invited as admin, we can just update the group info ourselves
@ -756,7 +757,7 @@ class GroupManagerV2Impl @Inject constructor(
} }
storage.insertIncomingInfoMessage( storage.insertIncomingInfoMessage(
context = MessagingModuleConfiguration.shared.context, context = application,
senderPublicKey = userId, senderPublicKey = userId,
groupID = groupId.hexString, groupID = groupId.hexString,
type = SignalServiceGroup.Type.KICKED, type = SignalServiceGroup.Type.KICKED,

@ -1 +1 @@
Subproject commit 995e22dcbf08b3cb9e2ad595859e4cd9a4ed8776 Subproject commit f649dec5d6a38365e3add8fe0b4c159b2ffe19ac

View File

@ -296,3 +296,11 @@ Java_network_loki_messenger_libsession_1util_GroupKeysConfig_currentGeneration(J
auto ptr = ptrToKeys(env, thiz); auto ptr = ptrToKeys(env, thiz);
return ptr->current_generation(); return ptr->current_generation();
} }
extern "C"
JNIEXPORT jboolean JNICALL
Java_network_loki_messenger_libsession_1util_GroupKeysConfig_admin(JNIEnv *env, jobject thiz) {
std::lock_guard lock{util::util_mutex_};
auto ptr = ptrToKeys(env, thiz);
return ptr->admin();
}

View File

@ -500,6 +500,7 @@ class GroupKeysConfig private constructor(pointer: Long): ConfigSig(pointer), Mu
external override fun subAccountSign(message: ByteArray, signingValue: ByteArray): SwarmAuth external override fun subAccountSign(message: ByteArray, signingValue: ByteArray): SwarmAuth
external override fun currentGeneration(): Int external override fun currentGeneration(): Int
external fun admin(): Boolean
data class SwarmAuth( data class SwarmAuth(
val subAccount: String, val subAccount: String,

View File

@ -143,7 +143,7 @@ class RemoveGroupMemberHandler @Inject constructor(
pendingRemovals to (calls as List<SnodeAPI.SnodeBatchRequestInfo>) pendingRemovals to (calls as List<SnodeAPI.SnodeBatchRequestInfo>)
} }
if (batchCalls.isEmpty()) { if (pendingRemovals.isEmpty() || batchCalls.isEmpty()) {
return return
} }

View File

@ -18,10 +18,12 @@ import org.session.libsession.messaging.jobs.MessageReceiveParameters
import org.session.libsession.messaging.messages.Destination import org.session.libsession.messaging.messages.Destination
import org.session.libsession.snode.RawResponse import org.session.libsession.snode.RawResponse
import org.session.libsession.snode.SnodeAPI import org.session.libsession.snode.SnodeAPI
import org.session.libsession.snode.SnodeClock
import org.session.libsession.snode.model.RetrieveMessageResponse import org.session.libsession.snode.model.RetrieveMessageResponse
import org.session.libsession.snode.utilities.await import org.session.libsession.snode.utilities.await
import org.session.libsession.utilities.ConfigFactoryProtocol import org.session.libsession.utilities.ConfigFactoryProtocol
import org.session.libsession.utilities.ConfigMessage import org.session.libsession.utilities.ConfigMessage
import org.session.libsession.utilities.getClosedGroup
import org.session.libsignal.database.LokiAPIDatabaseProtocol import org.session.libsignal.database.LokiAPIDatabaseProtocol
import org.session.libsignal.utilities.AccountId import org.session.libsignal.utilities.AccountId
import org.session.libsignal.utilities.Log import org.session.libsignal.utilities.Log
@ -38,6 +40,7 @@ class ClosedGroupPoller(
private val groupManagerV2: GroupManagerV2, private val groupManagerV2: GroupManagerV2,
private val storage: StorageProtocol, private val storage: StorageProtocol,
private val lokiApiDatabase: LokiAPIDatabaseProtocol, private val lokiApiDatabase: LokiAPIDatabaseProtocol,
private val clock: SnodeClock,
) { ) {
companion object { companion object {
private const val POLL_INTERVAL = 3_000L private const val POLL_INTERVAL = 3_000L
@ -112,9 +115,7 @@ class ClosedGroupPoller(
} }
} }
val adminKey = requireNotNull(configFactoryProtocol.withUserConfigs { val adminKey = requireNotNull(configFactoryProtocol.getClosedGroup(closedGroupSessionId)) {
it.userGroups.getClosedGroup(closedGroupSessionId.hexString)
}) {
"Group doesn't exist" "Group doesn't exist"
}.adminKey }.adminKey
@ -135,7 +136,7 @@ class ClosedGroupPoller(
maxSize = null, maxSize = null,
), ),
RetrieveMessageResponse::class.java RetrieveMessageResponse::class.java
) ).messages.filterNotNull()
} }
if (configHashesToExtends.isNotEmpty() && adminKey != null) { if (configHashesToExtends.isNotEmpty() && adminKey != null) {
@ -146,7 +147,7 @@ class ClosedGroupPoller(
SnodeAPI.buildAuthenticatedAlterTtlBatchRequest( SnodeAPI.buildAuthenticatedAlterTtlBatchRequest(
messageHashes = configHashesToExtends.toList(), messageHashes = configHashesToExtends.toList(),
auth = groupAuth, auth = groupAuth,
newExpiry = SnodeAPI.nowWithOffset + 14.days.inWholeMilliseconds, newExpiry = clock.currentTimeMills() + 14.days.inWholeMilliseconds,
extend = true extend = true
), ),
) )
@ -191,7 +192,7 @@ class ClosedGroupPoller(
maxSize = null, maxSize = null,
), ),
responseType = RetrieveMessageResponse::class.java responseType = RetrieveMessageResponse::class.java
) ).messages.filterNotNull()
} }
} }
@ -238,23 +239,27 @@ class ClosedGroupPoller(
} }
private fun RetrieveMessageResponse.Message.toConfigMessage(): ConfigMessage { private fun RetrieveMessageResponse.Message.toConfigMessage(): ConfigMessage {
return ConfigMessage(hash, data, timestamp) return ConfigMessage(hash, data, timestamp ?: clock.currentTimeMills())
} }
private fun saveLastMessageHash(snode: Snode, body: RetrieveMessageResponse, namespace: Int) { private fun saveLastMessageHash(
if (body.messages.isNotEmpty()) { snode: Snode,
messages: List<RetrieveMessageResponse.Message>,
namespace: Int
) {
if (messages.isNotEmpty()) {
lokiApiDatabase.setLastMessageHashValue( lokiApiDatabase.setLastMessageHashValue(
snode = snode, snode = snode,
publicKey = closedGroupSessionId.hexString, publicKey = closedGroupSessionId.hexString,
newValue = body.messages.last().hash, newValue = messages.last().hash,
namespace = namespace namespace = namespace
) )
} }
} }
private suspend fun handleRevoked(body: RetrieveMessageResponse) { private suspend fun handleRevoked(messages: List<RetrieveMessageResponse.Message>) {
body.messages.forEach { msg -> messages.forEach { msg ->
val decoded = configFactoryProtocol.maybeDecryptForUser( val decoded = configFactoryProtocol.decryptForUser(
msg.data, msg.data,
Sodium.KICKED_DOMAIN, Sodium.KICKED_DOMAIN,
closedGroupSessionId, closedGroupSessionId,
@ -284,26 +289,26 @@ class ClosedGroupPoller(
} }
private fun handleGroupConfigMessages( private fun handleGroupConfigMessages(
keysResponse: RetrieveMessageResponse, keysResponse: List<RetrieveMessageResponse.Message>,
infoResponse: RetrieveMessageResponse, infoResponse: List<RetrieveMessageResponse.Message>,
membersResponse: RetrieveMessageResponse membersResponse: List<RetrieveMessageResponse.Message>
) { ) {
if (keysResponse.messages.isEmpty() && infoResponse.messages.isEmpty() && membersResponse.messages.isEmpty()) { if (keysResponse.isEmpty() && infoResponse.isEmpty() && membersResponse.isEmpty()) {
return return
} }
Log.d( Log.d(
TAG, "Handling group config messages(" + TAG, "Handling group config messages(" +
"info = ${infoResponse.messages.size}, " + "info = ${infoResponse.size}, " +
"keys = ${keysResponse.messages.size}, " + "keys = ${keysResponse.size}, " +
"members = ${membersResponse.messages.size})" "members = ${membersResponse.size})"
) )
configFactoryProtocol.mergeGroupConfigMessages( configFactoryProtocol.mergeGroupConfigMessages(
groupId = closedGroupSessionId, groupId = closedGroupSessionId,
keys = keysResponse.messages.map { it.toConfigMessage() }, keys = keysResponse.map { it.toConfigMessage() },
info = infoResponse.messages.map { it.toConfigMessage() }, info = infoResponse.map { it.toConfigMessage() },
members = membersResponse.messages.map { it.toConfigMessage() }, members = membersResponse.map { it.toConfigMessage() },
) )
} }
@ -314,6 +319,7 @@ class ClosedGroupPoller(
snode = snode, snode = snode,
publicKey = closedGroupSessionId.hexString, publicKey = closedGroupSessionId.hexString,
decrypt = it.groupKeys::decrypt, decrypt = it.groupKeys::decrypt,
namespace = Namespace.CLOSED_GROUP_MESSAGES(),
) )
} }
@ -331,7 +337,7 @@ class ClosedGroupPoller(
} }
if (messages.isNotEmpty()) { if (messages.isNotEmpty()) {
Log.d(TAG, "namespace for messages rx count: ${messages.size}") Log.d(TAG, "Received and handled ${messages.size} group messages")
} }
} }
} }

View File

@ -48,7 +48,6 @@ import org.session.libsignal.utilities.Snode
import org.session.libsignal.utilities.prettifiedDescription import org.session.libsignal.utilities.prettifiedDescription
import org.session.libsignal.utilities.retryIfNeeded import org.session.libsignal.utilities.retryIfNeeded
import org.session.libsignal.utilities.retryWithUniformInterval import org.session.libsignal.utilities.retryWithUniformInterval
import java.util.Date
import java.util.Locale import java.util.Locale
import kotlin.collections.component1 import kotlin.collections.component1
import kotlin.collections.component2 import kotlin.collections.component2
@ -989,7 +988,7 @@ object SnodeAPI {
} }
else Pair(MessageWrapper.unwrap(data), rawMessageAsJSON["hash"] as? String) else Pair(MessageWrapper.unwrap(data), rawMessageAsJSON["hash"] as? String)
} catch (e: Exception) { } catch (e: Exception) {
Log.d("Loki", "Failed to unwrap data for message: ${rawMessage.prettifiedDescription()}.") Log.d("Loki", "Failed to unwrap data for message: ${rawMessage.prettifiedDescription()}.", e)
null null
} }
} else { } else {

View File

@ -1,7 +1,11 @@
package org.session.libsession.snode.model package org.session.libsession.snode.model
import android.util.Base64
import com.fasterxml.jackson.annotation.JsonCreator import com.fasterxml.jackson.annotation.JsonCreator
import com.fasterxml.jackson.annotation.JsonProperty import com.fasterxml.jackson.annotation.JsonProperty
import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import com.fasterxml.jackson.databind.util.StdConverter
data class StoreMessageResponse @JsonCreator constructor( data class StoreMessageResponse @JsonCreator constructor(
@JsonProperty("hash") val hash: String, @JsonProperty("hash") val hash: String,
@ -9,12 +13,29 @@ data class StoreMessageResponse @JsonCreator constructor(
) )
class RetrieveMessageResponse @JsonCreator constructor( class RetrieveMessageResponse @JsonCreator constructor(
@JsonProperty("messages") val messages: List<Message>, @JsonProperty("messages")
// Apply converter to the element so that if one of the message fails to deserialize, it will
// be a null value instead of failing the whole list.
@JsonDeserialize(contentConverter = RetrieveMessageConverter::class)
val messages: List<Message?>,
) { ) {
class Message @JsonCreator constructor( class Message(
@JsonProperty("hash") val hash: String, val hash: String,
@JsonProperty("t") val timestamp: Long, val timestamp: Long?,
// Jackson is able to deserialize byte arrays from base64 strings val data: ByteArray,
@JsonProperty("data") val data: ByteArray,
) )
} }
internal class RetrieveMessageConverter : StdConverter<JsonNode, RetrieveMessageResponse.Message?>() {
override fun convert(value: JsonNode?): RetrieveMessageResponse.Message? {
value ?: return null
val hash = value.get("hash")?.asText()?.takeIf { it.isNotEmpty() } ?: return null
val timestamp = value.get("t")?.asLong()?.takeIf { it > 0 }
val data = runCatching {
Base64.decode(value.get("data")?.asText().orEmpty(), Base64.DEFAULT)
}.getOrNull() ?: return null
return RetrieveMessageResponse.Message(hash, timestamp, data)
}
}

View File

@ -3,7 +3,6 @@ package org.session.libsession.utilities
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.first import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onStart import kotlinx.coroutines.flow.onStart
import kotlinx.coroutines.withTimeoutOrNull import kotlinx.coroutines.withTimeoutOrNull
import network.loki.messenger.libsession_util.MutableConfig import network.loki.messenger.libsession_util.MutableConfig
@ -48,9 +47,9 @@ interface ConfigFactoryProtocol {
fun getGroupAuth(groupId: AccountId): SwarmAuth? fun getGroupAuth(groupId: AccountId): SwarmAuth?
fun removeGroup(groupId: AccountId) fun removeGroup(groupId: AccountId)
fun maybeDecryptForUser(encoded: ByteArray, fun decryptForUser(encoded: ByteArray,
domain: String, domain: String,
closedGroupSessionId: AccountId): ByteArray? closedGroupSessionId: AccountId): ByteArray?
fun mergeGroupConfigMessages( fun mergeGroupConfigMessages(
groupId: AccountId, groupId: AccountId,