diff --git a/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java b/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java index b627e3ff29..5062783dc2 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java +++ b/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java @@ -144,7 +144,7 @@ public class ApplicationContext extends Application implements DefaultLifecycleO private TypingStatusRepository typingStatusRepository; private TypingStatusSender typingStatusSender; private ReadReceiptManager readReceiptManager; - private ProfileManager profileManager; + public MessageNotifier messageNotifier = null; public Poller poller = null; public Broadcaster broadcaster = null; @@ -166,6 +166,7 @@ public class ApplicationContext extends Application implements DefaultLifecycleO PushRegistrationHandler pushRegistrationHandler; @Inject TokenFetcher tokenFetcher; @Inject GroupManagerV2 groupManagerV2; + @Inject SSKEnvironment.ProfileManagerProtocol profileManager; CallMessageProcessor callMessageProcessor; MessagingModuleConfiguration messagingModuleConfiguration; @@ -268,9 +269,8 @@ public class ApplicationContext extends Application implements DefaultLifecycleO initializeTypingStatusRepository(); initializeTypingStatusSender(); initializeReadReceiptManager(); - initializeProfileManager(); initializePeriodicTasks(); - SSKEnvironment.Companion.configure(getTypingStatusRepository(), getReadReceiptManager(), getProfileManager(), messageNotifier, getExpiringMessageManager()); + SSKEnvironment.Companion.configure(getTypingStatusRepository(), getReadReceiptManager(), profileManager, messageNotifier, getExpiringMessageManager()); initializeWebRtc(); initializeBlobProvider(); resubmitProfilePictureIfNeeded(); @@ -368,9 +368,6 @@ public class ApplicationContext extends Application implements DefaultLifecycleO return readReceiptManager; } - public ProfileManager getProfileManager() { - return profileManager; - } public boolean isAppVisible() { return isAppVisible; @@ -426,10 +423,6 @@ public class ApplicationContext extends Application implements DefaultLifecycleO this.readReceiptManager = new ReadReceiptManager(); } - private void initializeProfileManager() { - this.profileManager = new ProfileManager(this, configFactory); - } - private void initializeTypingStatusSender() { this.typingStatusSender = new TypingStatusSender(this); } diff --git a/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/ConversationViewModel.kt b/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/ConversationViewModel.kt index 44c31b4ec9..725806e0c0 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/ConversationViewModel.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/ConversationViewModel.kt @@ -15,7 +15,6 @@ import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.flow.update import kotlinx.coroutines.launch -import kotlinx.coroutines.withContext import network.loki.messenger.R import network.loki.messenger.libsession_util.util.GroupMember import org.session.libsession.database.MessageDataProvider @@ -36,7 +35,6 @@ import org.thoughtcrime.securesms.database.ThreadDatabase import org.thoughtcrime.securesms.database.model.MessageRecord import org.thoughtcrime.securesms.database.model.MmsMessageRecord import org.thoughtcrime.securesms.repository.ConversationRepository -import org.thoughtcrime.securesms.util.ConfigurationMessageUtilities import java.util.UUID class ConversationViewModel( @@ -47,7 +45,6 @@ class ConversationViewModel( private val messageDataProvider: MessageDataProvider, private val groupDb: GroupDatabase, private val threadDb: ThreadDatabase, - private val appContext: Context, ) : ViewModel() { val showSendAfterApprovalText: Boolean @@ -348,10 +345,6 @@ class ConversationViewModel( _uiState.update { it.copy(messageRequestState = MessageRequestUiState.Invisible) } - - withContext(Dispatchers.IO) { - ConfigurationMessageUtilities.forceSyncConfigurationNowIfNeeded(appContext) - } } .onFailure { showMessage("Couldn't accept message request due to error: $it") @@ -362,10 +355,14 @@ class ConversationViewModel( } } - fun declineMessageRequest() { + fun declineMessageRequest() = viewModelScope.launch { repository.declineMessageRequest(threadId, recipient!!) - ConfigurationMessageUtilities.forceSyncConfigurationNowIfNeeded(appContext) - _uiState.update { it.copy(shouldExit = true) } + .onSuccess { + _uiState.update { it.copy(shouldExit = true) } + } + .onFailure { + showMessage("Couldn't decline message request due to error: $it") + } } private fun showMessage(message: String) { @@ -456,7 +453,6 @@ class ConversationViewModel( messageDataProvider = messageDataProvider, groupDb = groupDb, threadDb = threadDb, - appContext = context, ) as T } } diff --git a/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt b/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt index 609da9b67f..edef9c20f2 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt @@ -1079,137 +1079,6 @@ open class Storage( DatabaseComponent.get(context).groupDatabase().create(groupId, title, members, avatar, relay, admins, formationTimestamp) } - override fun createNewGroup(groupName: String, groupDescription: String, members: Set): Optional { - val userGroups = configFactory.userGroups ?: return Optional.absent() - val convoVolatile = configFactory.convoVolatile ?: return Optional.absent() - val ourSessionId = getUserPublicKey() ?: return Optional.absent() - - val groupCreationTimestamp = SnodeAPI.nowWithOffset - - val group = userGroups.createGroup() - val adminKey = checkNotNull(group.adminKey) { - "Admin key is null for new group creation." - } - - userGroups.set(group) - val groupInfo = configFactory.getGroupInfoConfig(group.groupAccountId) ?: return Optional.absent() - val groupMembers = configFactory.getGroupMemberConfig(group.groupAccountId) ?: return Optional.absent() - - with (groupInfo) { - setName(groupName) - setDescription(groupDescription) - } - - groupMembers.set( - LibSessionGroupMember(ourSessionId, getUserProfile().displayName, admin = true) - ) - - members.forEach { groupMembers.set(LibSessionGroupMember(it.accountID, it.name).setInvited()) } - - val groupKeys = configFactory.constructGroupKeysConfig(group.groupAccountId, - info = groupInfo, - members = groupMembers) ?: return Optional.absent() - - // Manually re-key to prevent issue with linked admin devices - groupKeys.rekey(groupInfo, groupMembers) - - val newGroupRecipient = group.groupAccountId.hexString - val configTtl = 14 * 24 * 60 * 60 * 1000L - // Test the sending - val keyPush = groupKeys.pendingConfig() ?: return Optional.absent() - - val groupAdminSigner = OwnedSwarmAuth.ofClosedGroup(group.groupAccountId, adminKey) - - val keysSnodeMessage = SnodeMessage( - newGroupRecipient, - Base64.encodeBytes(keyPush), - configTtl, - groupCreationTimestamp - ) - val keysBatchInfo = SnodeAPI.buildAuthenticatedStoreBatchInfo( - groupKeys.namespace(), - keysSnodeMessage, - groupAdminSigner - ) - - val (infoPush, infoSeqNo) = groupInfo.push() - val infoSnodeMessage = SnodeMessage( - newGroupRecipient, - Base64.encodeBytes(infoPush), - configTtl, - groupCreationTimestamp - ) - val infoBatchInfo = SnodeAPI.buildAuthenticatedStoreBatchInfo( - groupInfo.namespace(), - infoSnodeMessage, - groupAdminSigner - ) - - val (memberPush, memberSeqNo) = groupMembers.push() - val memberSnodeMessage = SnodeMessage( - newGroupRecipient, - Base64.encodeBytes(memberPush), - configTtl, - groupCreationTimestamp - ) - val memberBatchInfo = SnodeAPI.buildAuthenticatedStoreBatchInfo( - groupMembers.namespace(), - memberSnodeMessage, - groupAdminSigner - ) - - try { - val snode = SnodeAPI.getSingleTargetSnode(newGroupRecipient).get() - val response = SnodeAPI.getRawBatchResponse( - snode, - newGroupRecipient, - listOf(keysBatchInfo, infoBatchInfo, memberBatchInfo), - true - ).get() - - @Suppress("UNCHECKED_CAST") - val responseList = (response["results"] as List) - - val keyResponse = responseList[0] - val keyHash = (keyResponse["body"] as Map)["hash"] as String - val keyTimestamp = (keyResponse["body"] as Map)["t"] as Long - val infoResponse = responseList[1] - val infoHash = (infoResponse["body"] as Map)["hash"] as String - val memberResponse = responseList[2] - val memberHash = (memberResponse["body"] as Map)["hash"] as String - // TODO: check response success - groupKeys.loadKey(keyPush, keyHash, keyTimestamp, groupInfo, groupMembers) - groupInfo.confirmPushed(infoSeqNo, infoHash) - groupMembers.confirmPushed(memberSeqNo, memberHash) - - configFactory.saveGroupConfigs(groupKeys, groupInfo, groupMembers) // now check poller to be all - convoVolatile.set(Conversation.ClosedGroup(newGroupRecipient, groupCreationTimestamp, false)) - ConfigurationMessageUtilities.forceSyncConfigurationNowIfNeeded(context) - val groupRecipient = Recipient.from(context, fromSerialized(newGroupRecipient), false) - SSKEnvironment.shared.profileManager.setName(context, groupRecipient, groupInfo.getName()) - setRecipientApprovedMe(groupRecipient, true) - setRecipientApproved(groupRecipient, true) - Log.d("Group Config", "Saved group config for $newGroupRecipient") - pollerFactory.updatePollers() - - val memberArray = members.map(Contact::accountID).toTypedArray() - val job = InviteContactsJob(group.groupAccountId.hexString, memberArray) - JobQueue.shared.add(job) - return Optional.of(groupRecipient) - } catch (e: Exception) { - Log.e("Group Config", e) - Log.e("Group Config", "Deleting group from our group") - // delete the group from user groups - userGroups.erase(group) - } finally { - groupKeys.free() - groupInfo.free() - groupMembers.free() - } - - return Optional.absent() - } - override fun createInitialConfigGroup(groupPublicKey: String, name: String, members: Map, formationTimestamp: Long, encryptionKeyPair: ECKeyPair, expirationTimer: Int) { val volatiles = configFactory.convoVolatile ?: return val userGroups = configFactory.userGroups ?: return @@ -1375,141 +1244,6 @@ open class Storage( override fun getMembers(groupPublicKey: String): List = configFactory.getGroupMemberConfig(AccountId(groupPublicKey))?.use { it.all() }?.toList() ?: emptyList() - private fun approveGroupInvite(threadId: Long, groupSessionId: AccountId) { - val groups = configFactory.userGroups ?: return - val group = groups.getClosedGroup(groupSessionId.hexString) ?: return - - configFactory.persist( - forConfigObject = groups.apply { set(group.copy(invited = false)) }, - timestamp = SnodeAPI.nowWithOffset - ) - - // Send invite response if we aren't admin. If we already have admin access, - // the group configs are already up-to-date (hence no need to reponse to the invite) - if (group.adminKey == null) { - val inviteResponse = GroupUpdateInviteResponseMessage.newBuilder() - .setIsApproved(true) - val responseData = GroupUpdateMessage.newBuilder() - .setInviteResponse(inviteResponse) - val responseMessage = GroupUpdated(responseData.build()) - clearMessages(threadId) - // this will fail the first couple of times :) - MessageSender.send(responseMessage, fromSerialized(groupSessionId.hexString)) - } else { - // Update our on member state - configFactory.withGroupConfigsOrNull(groupSessionId) { info, members, keys -> - members.get(getUserPublicKey().orEmpty())?.let { member -> - members.set(member.setPromoteSuccess().setInvited()) - } - - configFactory.saveGroupConfigs(keys, info, members) - } - } - - configFactory.persist(groups, SnodeAPI.nowWithOffset) - ConfigurationMessageUtilities.forceSyncConfigurationNowIfNeeded(context) - pollerFactory.pollerFor(groupSessionId)?.start() - - // clear any group invites for this session ID (just in case there's a re-invite from an approved member after an invite from non-approved) - DatabaseComponent.get(context).lokiMessageDatabase().deleteGroupInviteReferrer(threadId) - } - - override fun respondToClosedGroupInvitation( - threadId: Long, - groupRecipient: Recipient, - approved: Boolean - ) { - val groups = configFactory.userGroups ?: return - val groupSessionId = AccountId(groupRecipient.address.serialize()) - // Whether approved or not, delete the invite - DatabaseComponent.get(context).lokiMessageDatabase().deleteGroupInviteReferrer(threadId) - if (!approved) { - groups.eraseClosedGroup(groupSessionId.hexString) - configFactory.persist(groups, SnodeAPI.nowWithOffset) - ConfigurationMessageUtilities.forceSyncConfigurationNowIfNeeded(context) - deleteConversation(threadId) - return - } else { - approveGroupInvite(threadId, groupSessionId) - } - - } - - override fun addClosedGroupInvite( - groupId: AccountId, - name: String, - authData: ByteArray?, - adminKey: ByteArray?, - invitingAdmin: AccountId, - invitingMessageHash: String?, - ) { - require(authData != null || adminKey != null) { - "Must provide either authData or adminKey" - } - - val recipient = Recipient.from(context, fromSerialized(groupId.hexString), false) - val profileManager = SSKEnvironment.shared.profileManager - val groups = configFactory.userGroups ?: return - val inviteDb = DatabaseComponent.get(context).lokiMessageDatabase() - val shouldAutoApprove = getRecipientApproved(fromSerialized(invitingAdmin.hexString)) - val closedGroupInfo = GroupInfo.ClosedGroupInfo( - groupAccountId = groupId, - adminKey = adminKey, - authData = authData, - priority = PRIORITY_VISIBLE, - invited = !shouldAutoApprove, - name = name, - ) - groups.set(closedGroupInfo) - - configFactory.persist(groups, SnodeAPI.nowWithOffset) - profileManager.setName(context, recipient, name) - val groupThreadId = getOrCreateThreadIdFor(recipient.address) - setRecipientApprovedMe(recipient, true) - setRecipientApproved(recipient, shouldAutoApprove) - if (shouldAutoApprove) { - approveGroupInvite(groupThreadId, groupId) - } else { - inviteDb.addGroupInviteReferrer(groupThreadId, invitingAdmin.hexString) - insertGroupInviteControlMessage(SnodeAPI.nowWithOffset, invitingAdmin.hexString, groupId, name) - } - - val userAuth = this.userAuth - if (invitingMessageHash != null && userAuth != null) { - val batch = SnodeAPI.buildAuthenticatedDeleteBatchInfo( - auth = userAuth, - listOf(invitingMessageHash) - ) - - SnodeAPI.getSingleTargetSnode(userAuth.accountId.hexString).map { snode -> - SnodeAPI.getRawBatchResponse(snode, userAuth.accountId.hexString, listOf(batch)) - }.success { - Log.d(TAG, "Successfully deleted invite message") - }.fail { e -> - Log.e(TAG, "Error deleting invite message", e) - } - } - } - - override fun setGroupInviteCompleteIfNeeded(approved: Boolean, invitee: String, closedGroup: AccountId) { - // don't try to process invitee acceptance if we aren't admin - if (configFactory.userGroups?.getClosedGroup(closedGroup.hexString)?.hasAdminKey() != true) return - - configFactory.getGroupMemberConfig(closedGroup)?.use { groupMembers -> - val member = groupMembers.get(invitee) ?: run { - Log.e("ClosedGroup", "User wasn't in the group membership to add!") - return - } - if (!member.invitePending) return groupMembers.close() - if (approved) { - groupMembers.set(member.setAccepted()) - } else { - groupMembers.erase(member) - } - configFactory.persistGroupConfigDump(groupMembers, closedGroup, SnodeAPI.nowWithOffset) - ConfigurationMessageUtilities.forceSyncConfigurationNowIfNeeded(Destination.ClosedGroup(closedGroup.hexString)) - } - } override fun getLibSessionClosedGroup(groupSessionId: String): GroupInfo.ClosedGroupInfo? { return configFactory.userGroups?.getClosedGroup(groupSessionId) @@ -1557,7 +1291,7 @@ open class Storage( mmsDB.updateInfoMessage(messageId, newMessage.toJSON()) } - private fun insertGroupInviteControlMessage(sentTimestamp: Long, senderPublicKey: String, closedGroup: AccountId, groupName: String): Long? { + override fun insertGroupInviteControlMessage(sentTimestamp: Long, senderPublicKey: String, closedGroup: AccountId, groupName: String): Long? { val updateData = UpdateMessageData(UpdateMessageData.Kind.GroupInvitation(senderPublicKey, groupName)) return insertUpdateControlMessage(updateData, sentTimestamp, senderPublicKey, closedGroup) } @@ -1610,46 +1344,6 @@ open class Storage( insertGroupInfoChange(message, closedGroupId) } - override fun handleKicked(groupAccountId: AccountId) { - pollerFactory.pollerFor(groupAccountId)?.stop() - } - - - override fun setName(groupSessionId: String, newName: String) { - val closedGroupId = AccountId(groupSessionId) - val adminKey = configFactory.userGroups?.getClosedGroup(groupSessionId)?.adminKey ?: return - if (adminKey.isEmpty()) { - return Log.e("ClosedGroup", "No admin key for group") - } - - configFactory.withGroupConfigsOrNull(closedGroupId) { info, members, keys -> - info.setName(newName) - configFactory.saveGroupConfigs(keys, info, members) - } - - val groupDestination = Destination.ClosedGroup(groupSessionId) - ConfigurationMessageUtilities.forceSyncConfigurationNowIfNeeded(groupDestination) - val timestamp = SnodeAPI.nowWithOffset - val signature = SodiumUtilities.sign( - buildInfoChangeVerifier(GroupUpdateInfoChangeMessage.Type.NAME, timestamp), - adminKey - ) - - val message = GroupUpdated( - GroupUpdateMessage.newBuilder() - .setInfoChangeMessage( - GroupUpdateInfoChangeMessage.newBuilder() - .setUpdatedName(newName) - .setType(GroupUpdateInfoChangeMessage.Type.NAME) - .setAdminSignature(ByteString.copyFrom(signature)) - ) - .build() - ).apply { - sentTimestamp = timestamp - } - MessageSender.send(message, fromSerialized(groupSessionId)) - insertGroupInfoChange(message, closedGroupId) - } override fun sendGroupUpdateDeleteMessage(groupSessionId: String, messageHashes: List): Promise { val closedGroup = configFactory.userGroups?.getClosedGroup(groupSessionId) diff --git a/app/src/main/java/org/thoughtcrime/securesms/dependencies/AppModule.kt b/app/src/main/java/org/thoughtcrime/securesms/dependencies/AppModule.kt index 90f9c03932..a2c2bacdce 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/dependencies/AppModule.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/dependencies/AppModule.kt @@ -11,11 +11,13 @@ import dagger.hilt.android.qualifiers.ApplicationContext import dagger.hilt.components.SingletonComponent import org.session.libsession.messaging.groups.GroupManagerV2 import org.session.libsession.utilities.AppTextSecurePreferences +import org.session.libsession.utilities.SSKEnvironment import org.session.libsession.utilities.TextSecurePreferences import org.session.libsession.utilities.Toaster import org.thoughtcrime.securesms.groups.GroupManagerV2Impl import org.thoughtcrime.securesms.repository.ConversationRepository import org.thoughtcrime.securesms.repository.DefaultConversationRepository +import org.thoughtcrime.securesms.sskenvironment.ProfileManager import javax.inject.Singleton @Module @@ -30,6 +32,9 @@ abstract class AppModule { @Binds abstract fun bindGroupManager(groupManager: GroupManagerV2Impl): GroupManagerV2 + + @Binds + abstract fun bindProfileManager(profileManager: ProfileManager): SSKEnvironment.ProfileManagerProtocol } @Module diff --git a/app/src/main/java/org/thoughtcrime/securesms/groups/CreateGroupViewModel.kt b/app/src/main/java/org/thoughtcrime/securesms/groups/CreateGroupViewModel.kt index 04967fe91b..d7050ade31 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/groups/CreateGroupViewModel.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/groups/CreateGroupViewModel.kt @@ -11,6 +11,7 @@ import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.launch import kotlinx.coroutines.withContext import org.session.libsession.database.StorageProtocol +import org.session.libsession.messaging.groups.GroupManagerV2 import org.thoughtcrime.securesms.dependencies.ConfigFactory import javax.inject.Inject @@ -19,6 +20,7 @@ import javax.inject.Inject class CreateGroupViewModel @Inject constructor( configFactory: ConfigFactory, private val storage: StorageProtocol, + private val groupManagerV2: GroupManagerV2, ): ViewModel() { // Child view model to handle contact selection logic val selectContactsViewModel = SelectContactsViewModel( @@ -60,15 +62,25 @@ class CreateGroupViewModel @Inject constructor( mutableIsLoading.value = true - val recipient = withContext(Dispatchers.Default) { - storage.createNewGroup(groupName, "", selected) + val createResult = withContext(Dispatchers.Default) { + runCatching { + groupManagerV2.createGroup( + groupName = groupName, + groupDescription = "", + members = selected + ) + } } - if (recipient.isPresent) { - val threadId = withContext(Dispatchers.Default) { storage.getOrCreateThreadIdFor(recipient.get().address) } - mutableEvents.emit(CreateGroupEvent.NavigateToConversation(threadId)) - } else { - mutableEvents.emit(CreateGroupEvent.Error("Failed to create group")) + when (val recipient = createResult.getOrNull()) { + null -> { + mutableEvents.emit(CreateGroupEvent.Error("Failed to create group")) + + } + else -> { + val threadId = withContext(Dispatchers.Default) { storage.getOrCreateThreadIdFor(recipient.address) } + mutableEvents.emit(CreateGroupEvent.NavigateToConversation(threadId)) + } } mutableIsLoading.value = false diff --git a/app/src/main/java/org/thoughtcrime/securesms/groups/GroupManagerV2Impl.kt b/app/src/main/java/org/thoughtcrime/securesms/groups/GroupManagerV2Impl.kt index fcda98f347..6393ee0be5 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/groups/GroupManagerV2Impl.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/groups/GroupManagerV2Impl.kt @@ -3,10 +3,25 @@ package org.thoughtcrime.securesms.groups import android.content.Context import com.google.protobuf.ByteString import dagger.hilt.android.qualifiers.ApplicationContext +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.async +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.withContext +import network.loki.messenger.libsession_util.ConfigBase.Companion.PRIORITY_VISIBLE +import network.loki.messenger.libsession_util.GroupInfoConfig +import network.loki.messenger.libsession_util.GroupKeysConfig +import network.loki.messenger.libsession_util.GroupMembersConfig +import network.loki.messenger.libsession_util.UserGroupsConfig +import network.loki.messenger.libsession_util.util.Conversation +import network.loki.messenger.libsession_util.util.GroupInfo +import network.loki.messenger.libsession_util.util.GroupMember import network.loki.messenger.libsession_util.util.INVITE_STATUS_SENT import network.loki.messenger.libsession_util.util.Sodium import network.loki.messenger.libsession_util.util.UserPic import org.session.libsession.database.StorageProtocol +import org.session.libsession.database.userAuth +import org.session.libsession.messaging.MessagingModuleConfiguration import org.session.libsession.messaging.contacts.Contact import org.session.libsession.messaging.groups.GroupManagerV2 import org.session.libsession.messaging.jobs.ConfigurationSyncJob.Companion.messageInformation @@ -14,23 +29,32 @@ import org.session.libsession.messaging.jobs.InviteContactsJob import org.session.libsession.messaging.jobs.JobQueue import org.session.libsession.messaging.messages.Destination import org.session.libsession.messaging.messages.control.GroupUpdated +import org.session.libsession.messaging.messages.visible.Profile import org.session.libsession.messaging.sending_receiving.MessageSender import org.session.libsession.messaging.utilities.MessageAuthentication.buildDeleteMemberContentSignature +import org.session.libsession.messaging.utilities.MessageAuthentication.buildInfoChangeVerifier import org.session.libsession.messaging.utilities.MessageAuthentication.buildMemberChangeSignature import org.session.libsession.messaging.utilities.SodiumUtilities import org.session.libsession.snode.OwnedSwarmAuth import org.session.libsession.snode.SnodeAPI import org.session.libsession.snode.SnodeMessage import org.session.libsession.snode.model.BatchResponse +import org.session.libsession.snode.model.StoreMessageResponse import org.session.libsession.snode.utilities.await import org.session.libsession.utilities.Address +import org.session.libsession.utilities.SSKEnvironment +import org.session.libsession.utilities.recipients.Recipient import org.session.libsession.utilities.withGroupConfigsOrNull +import org.session.libsignal.messages.SignalServiceGroup import org.session.libsignal.protos.SignalServiceProtos.DataMessage import org.session.libsignal.protos.SignalServiceProtos.DataMessage.GroupUpdateDeleteMemberContentMessage +import org.session.libsignal.protos.SignalServiceProtos.DataMessage.GroupUpdateInfoChangeMessage +import org.session.libsignal.protos.SignalServiceProtos.DataMessage.GroupUpdateInviteResponseMessage import org.session.libsignal.protos.SignalServiceProtos.DataMessage.GroupUpdateMemberChangeMessage import org.session.libsignal.protos.SignalServiceProtos.DataMessage.GroupUpdateMessage import org.session.libsignal.utilities.AccountId import org.session.libsignal.utilities.Base64 +import org.session.libsignal.utilities.Log import org.session.libsignal.utilities.Namespace import org.thoughtcrime.securesms.database.LokiMessageDatabase import org.thoughtcrime.securesms.database.MmsSmsDatabase @@ -40,15 +64,20 @@ import org.thoughtcrime.securesms.util.ConfigurationMessageUtilities import javax.inject.Inject import javax.inject.Singleton +private const val TAG = "GroupManagerV2Impl" + @Singleton class GroupManagerV2Impl @Inject constructor( - val storage: StorageProtocol, - val configFactory: ConfigFactory, - val mmsSmsDatabase: MmsSmsDatabase, - val lokiDatabase: LokiMessageDatabase, - val pollerFactory: PollerFactory, + private val storage: StorageProtocol, + private val configFactory: ConfigFactory, + private val mmsSmsDatabase: MmsSmsDatabase, + private val lokiDatabase: LokiMessageDatabase, + private val pollerFactory: PollerFactory, + private val profileManager: SSKEnvironment.ProfileManagerProtocol, @ApplicationContext val application: Context, ) : GroupManagerV2 { + private val dispatcher = Dispatchers.Default + /** * Require admin access to a group, and return the admin key. * @@ -62,11 +91,214 @@ class GroupManagerV2Impl @Inject constructor( ?.takeIf { it.isNotEmpty() }) { "Only admin is allowed to invite members" } } + override suspend fun createGroup( + groupName: String, + groupDescription: String, + members: Set + ): Recipient = withContext(dispatcher) { + val userGroupsConfig = + requireNotNull(configFactory.userGroups) { "User groups config is not available" } + val convoVolatileConfig = + requireNotNull(configFactory.convoVolatile) { "Conversation volatile config is not available" } + val ourAccountId = + requireNotNull(storage.getUserPublicKey()) { "Our account ID is not available" } + val ourKeys = + requireNotNull(storage.getUserED25519KeyPair()) { "Our ED25519 key pair is not available" } + val ourProfile = storage.getUserProfile() + + val groupCreationTimestamp = SnodeAPI.nowWithOffset + + // Create a group in the user groups config + val group = userGroupsConfig.createGroup() + val adminKey = checkNotNull(group.adminKey) { "Admin key is null for new group creation." } + userGroupsConfig.set(group) + val groupId = group.groupAccountId + val groupAuth = OwnedSwarmAuth.ofClosedGroup(groupId, adminKey) + + try { + withNewGroupConfigs( + groupId = groupId, + userSecretKey = ourKeys.secretKey.asBytes, + groupAdminKey = adminKey + ) { infoConfig, membersConfig, keysConfig -> + // Update group's information + infoConfig.setName(groupName) + infoConfig.setDescription(groupDescription) + + // Add members + for (member in members) { + membersConfig.set( + GroupMember( + sessionId = member.accountID, + name = member.name, + profilePicture = member.profilePicture ?: UserPic.DEFAULT, + inviteStatus = INVITE_STATUS_SENT + ) + ) + } + + // Add ourselves as admin + membersConfig.set( + GroupMember( + sessionId = ourAccountId, + name = ourProfile.displayName, + profilePicture = ourProfile.profilePicture ?: UserPic.DEFAULT, + admin = true + ) + ) + + // Manually re-key to prevent issue with linked admin devices + keysConfig.rekey(infoConfig, membersConfig) + + + val configTtl = 14 * 24 * 60 * 60 * 1000L // 14 days + + // Push keys + val pendingKey = requireNotNull(keysConfig.pendingConfig()) { + "Expect pending keys data to push but got none" + } + + val pushKeys = async { + SnodeAPI.sendBatchRequest( + groupId, + SnodeAPI.buildAuthenticatedStoreBatchInfo( + namespace = keysConfig.namespace(), + message = SnodeMessage( + recipient = groupId.hexString, + data = Base64.encodeBytes(pendingKey), + ttl = configTtl, + timestamp = groupCreationTimestamp + ), + auth = groupAuth + ), + StoreMessageResponse::class.java + ) + } + + // Push info + val pushInfo = async { + val (infoPush, infoSeqNo) = infoConfig.push() + + infoSeqNo to SnodeAPI.sendBatchRequest( + groupId, + SnodeAPI.buildAuthenticatedStoreBatchInfo( + namespace = infoConfig.namespace(), + message = SnodeMessage( + recipient = groupId.hexString, + data = Base64.encodeBytes(infoPush), + ttl = configTtl, + timestamp = groupCreationTimestamp + ), + auth = groupAuth + ), + StoreMessageResponse::class.java + ) + } + + // Members push + val pushMembers = async { + val (membersPush, membersSeqNo) = membersConfig.push() + + membersSeqNo to SnodeAPI.sendBatchRequest( + groupId, + SnodeAPI.buildAuthenticatedStoreBatchInfo( + namespace = membersConfig.namespace(), + message = SnodeMessage( + recipient = groupId.hexString, + data = Base64.encodeBytes(membersPush), + ttl = configTtl, + timestamp = groupCreationTimestamp + ), + auth = groupAuth + ), + StoreMessageResponse::class.java + ) + } + + + // Wait for all the push requests to finish then update the configs + val (keyHash, keyTimestamp) = pushKeys.await() + val (infoSeqNo, infoHash) = pushInfo.await() + val (membersSeqNo, membersHash) = pushMembers.await() + + keysConfig.loadKey(pendingKey, keyHash, keyTimestamp, infoConfig, membersConfig) + infoConfig.confirmPushed(infoSeqNo, infoHash.hash) + membersConfig.confirmPushed(membersSeqNo, membersHash.hash) + + configFactory.saveGroupConfigs(keysConfig, infoConfig, membersConfig) + + // Add a new conversation into the volatile convo config and sync + convoVolatileConfig.set( + Conversation.ClosedGroup( + groupId.hexString, + groupCreationTimestamp, + false + ) + ) + ConfigurationMessageUtilities.forceSyncConfigurationNowIfNeeded(application) + + val recipient = + Recipient.from(application, Address.fromSerialized(groupId.hexString), false) + + // Apply various data locally + profileManager.setName(application, recipient, groupName) + storage.setRecipientApprovedMe(recipient, true) + storage.setRecipientApproved(recipient, true) + pollerFactory.updatePollers() + + // Invite members + JobQueue.shared.add( + InviteContactsJob( + groupSessionId = groupId.hexString, + memberSessionIds = members.map { it.accountID }.toTypedArray() + ) + ) + + recipient + } + } catch (e: Exception) { + Log.e(TAG, "Failed to create group", e) + + // Remove the group from the user groups config is sufficient as a "rollback" + userGroupsConfig.erase(group) + throw e + } + } + + private suspend fun withNewGroupConfigs( + groupId: AccountId, + userSecretKey: ByteArray, + groupAdminKey: ByteArray, + block: suspend CoroutineScope.(GroupInfoConfig, GroupMembersConfig, GroupKeysConfig) -> T + ): T { + return GroupInfoConfig.newInstance( + pubKey = groupId.pubKeyBytes, + secretKey = groupAdminKey + ).use { infoConfig -> + GroupMembersConfig.newInstance( + pubKey = groupId.pubKeyBytes, + secretKey = groupAdminKey + ).use { membersConfig -> + GroupKeysConfig.newInstance( + userSecretKey = userSecretKey, + groupPublicKey = groupId.pubKeyBytes, + groupSecretKey = groupAdminKey, + info = infoConfig, + members = membersConfig + ).use { keysConfig -> + coroutineScope { + this.block(infoConfig, membersConfig, keysConfig) + } + } + } + } + } + override suspend fun inviteMembers( group: AccountId, newMembers: List, shareHistory: Boolean - ) { + ): Unit = withContext(dispatcher) { val adminKey = requireAdminAccess(group) val groupAuth = OwnedSwarmAuth.ofClosedGroup(group, adminKey) @@ -76,7 +308,10 @@ class GroupManagerV2Impl @Inject constructor( val toSet = membersConfig.get(newMember.hexString) ?.let { existing -> if (existing.inviteFailed || existing.invitePending) { - existing.copy(inviteStatus = INVITE_STATUS_SENT, supplement = shareHistory) + existing.copy( + inviteStatus = INVITE_STATUS_SENT, + supplement = shareHistory + ) } else { existing } @@ -105,16 +340,18 @@ class GroupManagerV2Impl @Inject constructor( if (shareHistory) { for (member in newMembers) { val memberKey = keysConfig.supplementFor(member.hexString) - batchRequests.add(SnodeAPI.buildAuthenticatedStoreBatchInfo( - namespace = keysConfig.namespace(), - message = SnodeMessage( - recipient = group.hexString, - data = Base64.encodeBytes(memberKey), - ttl = SnodeMessage.CONFIG_TTL, - timestamp = timestamp - ), - auth = groupAuth, - )) + batchRequests.add( + SnodeAPI.buildAuthenticatedStoreBatchInfo( + namespace = keysConfig.namespace(), + message = SnodeMessage( + recipient = group.hexString, + data = Base64.encodeBytes(memberKey), + ttl = SnodeMessage.CONFIG_TTL, + timestamp = timestamp + ), + auth = groupAuth, + ) + ) } } else { keysConfig.rekey(infoConfig, membersConfig) @@ -150,7 +387,12 @@ class GroupManagerV2Impl @Inject constructor( configFactory.saveGroupConfigs(keysConfig, infoConfig, membersConfig) // Send the invitation message to the new members - JobQueue.shared.add(InviteContactsJob(group.hexString, newMembers.map { it.hexString }.toTypedArray())) + JobQueue.shared.add( + InviteContactsJob( + group.hexString, + newMembers.map { it.hexString }.toTypedArray() + ) + ) // Send a member change message to the group val signature = SodiumUtilities.sign( @@ -216,7 +458,8 @@ class GroupManagerV2Impl @Inject constructor( } override suspend fun leaveGroup(group: AccountId, deleteOnLeave: Boolean) { - val canSendGroupMessage = configFactory.userGroups?.getClosedGroup(group.hexString)?.kicked != true + val canSendGroupMessage = + configFactory.userGroups?.getClosedGroup(group.hexString)?.kicked != true val address = Address.fromSerialized(group.hexString) if (canSendGroupMessage) { @@ -250,30 +493,50 @@ class GroupManagerV2Impl @Inject constructor( } } - override suspend fun promoteMember(group: AccountId, members: List) { + override suspend fun promoteMember(group: AccountId, members: List): Unit = withContext(dispatcher) { val adminKey = requireAdminAccess(group) configFactory.withGroupConfigsOrNull(group) { info, membersConfig, keys -> - for (member in members) { - val promoted = membersConfig.get(member.hexString)?.setPromoteSent() ?: continue - membersConfig.set(promoted) - - val message = GroupUpdated( - GroupUpdateMessage.newBuilder() - .setPromoteMessage( - DataMessage.GroupUpdatePromoteMessage.newBuilder() - .setGroupIdentitySeed(ByteString.copyFrom(adminKey)) - .setName(info.getName()) + // Promote the members by sending a message containing the admin key to each member's swarm, + // we do this concurrently and then update the group configs after all the messages are sent. + val promoteResult = members.asSequence() + .mapNotNull { membersConfig.get(it.hexString) } + .map { memberConfig -> + async { + val message = GroupUpdated( + GroupUpdateMessage.newBuilder() + .setPromoteMessage( + DataMessage.GroupUpdatePromoteMessage.newBuilder() + .setGroupIdentitySeed(ByteString.copyFrom(adminKey)) + .setName(info.getName()) + ) + .build() ) - .build() - ) - MessageSender.send(message, Address.fromSerialized(group.hexString)) + + try { + MessageSender.sendNonDurably( + message = message, + address = Address.fromSerialized(memberConfig.sessionId), + isSyncMessage = false + ).await() + + memberConfig.setPromoteSent() + } catch (ec: Exception) { + Log.e(TAG, "Failed to send promote message", ec) + memberConfig.setPromoteFailed() + } + } + } + .toList() + + for (result in promoteResult) { + membersConfig.set(result.await()) } configFactory.saveGroupConfigs(keys, info, membersConfig) } - + // Send a group update message to the group telling members someone has been promoted val groupDestination = Destination.ClosedGroup(group.hexString) ConfigurationMessageUtilities.forceSyncConfigurationNowIfNeeded(groupDestination) val timestamp = SnodeAPI.nowWithOffset @@ -294,14 +557,16 @@ class GroupManagerV2Impl @Inject constructor( sentTimestamp = timestamp } - MessageSender.send(message, Address.fromSerialized(groupDestination.publicKey)) + MessageSender.send(message, Address.fromSerialized(group.hexString)) storage.insertGroupInfoChange(message, group) } - private suspend fun doRemoveMembers(group: AccountId, - removedMembers: List, - sendRemovedMessage: Boolean, - removeMemberMessages: Boolean) { + private suspend fun doRemoveMembers( + group: AccountId, + removedMembers: List, + sendRemovedMessage: Boolean, + removeMemberMessages: Boolean + ) = withContext(dispatcher) { val adminKey = requireAdminAccess(group) val groupAuth = OwnedSwarmAuth.ofClosedGroup(group, adminKey) @@ -324,7 +589,8 @@ class GroupManagerV2Impl @Inject constructor( ) this += Sodium.encryptForMultipleSimple( - messages = removedMembers.map{"${it.hexString}-${keys.currentGeneration()}".encodeToByteArray()}.toTypedArray(), + messages = removedMembers.map { "${it.hexString}-${keys.currentGeneration()}".encodeToByteArray() } + .toTypedArray(), recipients = removedMembers.map { it.pubKeyBytes }.toTypedArray(), ed25519SecretKey = adminKey, domain = Sodium.KICKED_DOMAIN @@ -345,22 +611,24 @@ class GroupManagerV2Impl @Inject constructor( val adminSignature = SodiumUtilities.sign( buildDeleteMemberContentSignature( - memberIds = removedMembers, - messageHashes = emptyList(), - timestamp = messageSendTimestamp - ), adminKey) + memberIds = removedMembers, + messageHashes = emptyList(), + timestamp = messageSendTimestamp + ), adminKey + ) this += SnodeAPI.buildAuthenticatedStoreBatchInfo( namespace = Namespace.CLOSED_GROUP_MESSAGES(), message = MessageSender.buildWrappedMessageToSnode( destination = Destination.ClosedGroup(group.hexString), - message = GroupUpdated(GroupUpdateMessage.newBuilder() - .setDeleteMemberContent( - GroupUpdateDeleteMemberContentMessage.newBuilder() - .addAllMemberSessionIds(removedMembers.map { it.hexString }) - .setAdminSignature(ByteString.copyFrom(adminSignature)) - ) - .build() + message = GroupUpdated( + GroupUpdateMessage.newBuilder() + .setDeleteMemberContent( + GroupUpdateDeleteMemberContentMessage.newBuilder() + .addAllMemberSessionIds(removedMembers.map { it.hexString }) + .setAdminSignature(ByteString.copyFrom(adminSignature)) + ) + .build() ).apply { sentTimestamp = messageSendTimestamp }, isSyncMessage = false ), @@ -370,7 +638,12 @@ class GroupManagerV2Impl @Inject constructor( } val snode = SnodeAPI.getSingleTargetSnode(group.hexString).await() - val responses = SnodeAPI.getBatchResponse(snode, group.hexString, essentialRequests, sequence = true) + val responses = SnodeAPI.getBatchResponse( + snode, + group.hexString, + essentialRequests, + sequence = true + ) responses.requireAllRequestsSuccessful("Failed to execute essential steps for removing member") @@ -403,9 +676,20 @@ class GroupManagerV2Impl @Inject constructor( this += "Sync keys config messages" to it.batch } - this += "Sync info config messages" to info.messageInformation(messagesToDelete, groupAuth).batch - this += "Sync member config messages" to members.messageInformation(messagesToDelete, groupAuth).batch - this += "Delete outdated config and member messages" to SnodeAPI.buildAuthenticatedDeleteBatchInfo(groupAuth, messagesToDelete) + this += "Sync info config messages" to info.messageInformation( + messagesToDelete, + groupAuth + ).batch + + this += "Sync member config messages" to members.messageInformation( + messagesToDelete, + groupAuth + ).batch + + this += "Delete outdated config and member messages" to SnodeAPI.buildAuthenticatedDeleteBatchInfo( + groupAuth, + messagesToDelete + ) } val response = SnodeAPI.getBatchResponse( @@ -422,7 +706,10 @@ class GroupManagerV2Impl @Inject constructor( if (sendRemovedMessage) { val timestamp = messageSendTimestamp val signature = SodiumUtilities.sign( - buildMemberChangeSignature(GroupUpdateMemberChangeMessage.Type.REMOVED, timestamp), + buildMemberChangeSignature( + GroupUpdateMemberChangeMessage.Type.REMOVED, + timestamp + ), adminKey ) @@ -447,18 +734,339 @@ class GroupManagerV2Impl @Inject constructor( ) } + override suspend fun respondToInvitation(groupId: AccountId, approved: Boolean) = withContext(dispatcher) { + val groups = requireNotNull(configFactory.userGroups) { + "User groups config is not available" + } + + val threadId = checkNotNull(storage.getThreadId(Address.fromSerialized(groupId.hexString))) { + "No thread has been created for the group" + } + + val group = requireNotNull(groups.getClosedGroup(groupId.hexString)) { + "Group must have been created into the config object before responding to an invitation" + } + + // Whether approved or not, delete the invite + lokiDatabase.deleteGroupInviteReferrer(threadId) + + if (approved) { + approveGroupInvite(groups, group, threadId) + } else { + groups.eraseClosedGroup(groupId.hexString) + storage.deleteConversation(threadId) + } + } + + private fun approveGroupInvite( + groups: UserGroupsConfig, + group: GroupInfo.ClosedGroupInfo, + threadId: Long, + ) { + val key = requireNotNull(storage.getUserPublicKey()) { + "Our account ID is not available" + } + + // Clear the invited flag of the group in the config + groups.set(group.copy(invited = false)) + configFactory.persist(forConfigObject = groups, timestamp = SnodeAPI.nowWithOffset) + ConfigurationMessageUtilities.forceSyncConfigurationNowIfNeeded(application) + + if (group.adminKey == null) { + // Send an invite response to the group if we are invited as a regular member + val inviteResponse = GroupUpdateInviteResponseMessage.newBuilder() + .setIsApproved(true) + val responseData = GroupUpdateMessage.newBuilder() + .setInviteResponse(inviteResponse) + val responseMessage = GroupUpdated(responseData.build()) + storage.clearMessages(threadId) + // this will fail the first couple of times :) + MessageSender.send(responseMessage, Address.fromSerialized(group.groupAccountId.hexString)) + } else { + // If we are invited as admin, we can just update the group info ourselves + configFactory.withGroupConfigsOrNull(group.groupAccountId) { info, members, keys -> + members.get(key)?.let { member -> + members.set(member.setPromoteSuccess().setAccepted()) + + configFactory.saveGroupConfigs(keys, info, members) + } + + Unit + } + + ConfigurationMessageUtilities.forceSyncConfigurationNowIfNeeded( + destination = Destination.ClosedGroup(group.groupAccountId.hexString) + ) + } + + pollerFactory.pollerFor(group.groupAccountId)?.start() + } + + override suspend fun onReceiveInvitation( + groupId: AccountId, + groupName: String, + authData: ByteArray, + inviter: AccountId, + inviteMessageHash: String? + ) = withContext(dispatcher) { + handleInvitation( + groupId = groupId, + groupName = groupName, + authDataOrAdminKey = authData, + fromPromotion = false, + inviter = inviter + ) + + // Delete the invite message remotely + if (inviteMessageHash != null) { + val auth = requireNotNull(storage.userAuth) { "No current user available" } + SnodeAPI.sendBatchRequest( + auth.accountId, + SnodeAPI.buildAuthenticatedDeleteBatchInfo(auth, listOf(inviteMessageHash)), + ) + } + } + + override suspend fun onReceivePromotion( + groupId: AccountId, + groupName: String, + adminKey: ByteArray, + promoter: AccountId, + promoteMessageHash: String? + ) = withContext(dispatcher) { + val groups = requireNotNull(configFactory.userGroups) { + "User groups config is not available" + } + + val userAuth = requireNotNull(storage.userAuth) { "No current user available" } + var group = groups.getClosedGroup(groupId.hexString) + + if (group == null) { + // If we haven't got the group in the config, it could mean that we haven't + // processed the invitation, or the invitation message is lost. We'll need to + // go through the invitation process again. + handleInvitation( + groupId = groupId, + groupName = groupName, + authDataOrAdminKey = adminKey, + fromPromotion = true, + inviter = promoter, + ) + } else { + // If we have the group in the config, we can just update the admin key + group = group.copy(adminKey = adminKey) + groups.set(group) + configFactory.persist(groups, SnodeAPI.nowWithOffset) + + // Update our promote state + configFactory.withGroupConfigsOrNull(groupId) { info, members, keys -> + members.get(userAuth.accountId.hexString)?.let { member -> + members.set(member.setPromoteSuccess()) + + configFactory.saveGroupConfigs(keys, info, members) + } + + Unit + } + + ConfigurationMessageUtilities.forceSyncConfigurationNowIfNeeded( + destination = Destination.ClosedGroup(groupId.hexString) + ) + + ConfigurationMessageUtilities.forceSyncConfigurationNowIfNeeded(application) + } + + // Delete the promotion message remotely + if (promoteMessageHash != null) { + SnodeAPI.sendBatchRequest( + userAuth.accountId, + SnodeAPI.buildAuthenticatedDeleteBatchInfo(userAuth, listOf(promoteMessageHash)), + ) + } + } + + /** + * Handle an invitation to a group. + * + * @param groupId the group ID + * @param groupName the group name + * @param authDataOrAdminKey the auth data or admin key. If this is an invitation, this is the auth data, if this is a promotion, this is the admin key. + * @param fromPromotion true if this is a promotion, false if this is an invitation + * @param inviter the invite message sender + * @return The newly created group info if the invitation is processed, null otherwise. + */ + private fun handleInvitation( + groupId: AccountId, + groupName: String, + authDataOrAdminKey: ByteArray, + fromPromotion: Boolean, + inviter: AccountId, + ) { + val groups = requireNotNull(configFactory.userGroups) { + "User groups config is not available" + } + + // If we have already received an invitation in the past, we should not process this one + if (groups.getClosedGroup(groupId.hexString)?.invited == true) { + return + } + + val recipient = Recipient.from(application, Address.fromSerialized(groupId.hexString), false) + + val shouldAutoApprove = storage.getRecipientApproved(Address.fromSerialized(inviter.hexString)) + val closedGroupInfo = GroupInfo.ClosedGroupInfo( + groupAccountId = groupId, + adminKey = authDataOrAdminKey.takeIf { fromPromotion }, + authData = authDataOrAdminKey.takeIf { !fromPromotion }, + priority = PRIORITY_VISIBLE, + invited = !shouldAutoApprove, + name = groupName, + ) + groups.set(closedGroupInfo) + + configFactory.persist(groups, SnodeAPI.nowWithOffset) + profileManager.setName(application, recipient, groupName) + val groupThreadId = storage.getOrCreateThreadIdFor(recipient.address) + storage.setRecipientApprovedMe(recipient, true) + storage.setRecipientApproved(recipient, shouldAutoApprove) + if (shouldAutoApprove) { + approveGroupInvite(groups, closedGroupInfo, groupThreadId) + } else { + lokiDatabase.addGroupInviteReferrer(groupThreadId, inviter.hexString) + storage.insertGroupInviteControlMessage(SnodeAPI.nowWithOffset, inviter.hexString, groupId, groupName) + } + } + + override suspend fun handleInviteResponse( + groupId: AccountId, + sender: AccountId, + approved: Boolean + ): Unit = withContext(dispatcher) { + if (!approved) { + // We should only see approved coming through + return@withContext + } + + val groups = requireNotNull(configFactory.userGroups) { + "User groups config is not available" + } + + val adminKey = groups.getClosedGroup(groupId.hexString)?.adminKey + if (adminKey == null || adminKey.isEmpty()) { + return@withContext // We don't have the admin key, we can't process the invite response + } + + configFactory.withGroupConfigsOrNull(groupId) { info, members, keys -> + val member = members.get(sender.hexString) + if (member == null) { + Log.e(TAG, "User wasn't in the group membership to add!") + return@withContext + } + + members.set(member.setAccepted()) + + configFactory.saveGroupConfigs(keys, info, members) + ConfigurationMessageUtilities.forceSyncConfigurationNowIfNeeded( + Destination.ClosedGroup(groupId.hexString) + ) + } + } + + override suspend fun handleKicked(groupId: AccountId): Unit = withContext(dispatcher) { + Log.d(TAG, "We were kicked from the group, delete and stop polling") + + // Stop polling the group immediately + pollerFactory.pollerFor(groupId)?.stop() + + val userId = requireNotNull(storage.getUserPublicKey()) { "No current user available" } + val userGroups = requireNotNull(configFactory.userGroups) { "User groups config is not available" } + val group = userGroups.getClosedGroup(groupId.hexString) ?: return@withContext + + // Retrieve the group name one last time from the group info, + // as we are going to clear the keys, we won't have the chance to + // read the group name anymore. + val groupName = configFactory.getGroupInfoConfig(groupId) + ?.use { it.getName() } + ?: group.name + + userGroups.set(group.copy( + authData = null, + adminKey = null, + name = groupName + )) + + configFactory.persist(userGroups, SnodeAPI.nowWithOffset) + + storage.insertIncomingInfoMessage( + context = MessagingModuleConfiguration.shared.context, + senderPublicKey = userId, + groupID = groupId.hexString, + type = SignalServiceGroup.Type.KICKED, + name = groupName, + members = emptyList(), + admins = emptyList(), + sentTimestamp = SnodeAPI.nowWithOffset, + ) + } + + override suspend fun setName(groupId: AccountId, newName: String): Unit = withContext(dispatcher) { + val adminKey = requireAdminAccess(groupId) + + configFactory.withGroupConfigsOrNull(groupId) { info, members, keys -> + info.setName(newName) + configFactory.saveGroupConfigs(keys, info, members) + } + + val groupDestination = Destination.ClosedGroup(groupId.hexString) + ConfigurationMessageUtilities.forceSyncConfigurationNowIfNeeded(groupDestination) + val timestamp = SnodeAPI.nowWithOffset + val signature = SodiumUtilities.sign( + buildInfoChangeVerifier(GroupUpdateInfoChangeMessage.Type.NAME, timestamp), + adminKey + ) + + val message = GroupUpdated( + GroupUpdateMessage.newBuilder() + .setInfoChangeMessage( + GroupUpdateInfoChangeMessage.newBuilder() + .setUpdatedName(newName) + .setType(GroupUpdateInfoChangeMessage.Type.NAME) + .setAdminSignature(ByteString.copyFrom(signature)) + ) + .build() + ).apply { + sentTimestamp = timestamp + } + + MessageSender.sendNonDurably(message, Address.fromSerialized(groupId.hexString), false).await() + storage.insertGroupInfoChange(message, groupId) + } + + private fun BatchResponse.requireAllRequestsSuccessful(errorMessage: String) { val firstError = this.results.firstOrNull { it.code != 200 } require(firstError == null) { "$errorMessage: ${firstError!!.body}" } } - private val Contact.profilePicture: UserPic? get() { - val url = this.profilePictureURL - val key = this.profilePictureEncryptionKey - return if (url != null && key != null) { - UserPic(url, key) - } else { - null + private val Contact.profilePicture: UserPic? + get() { + val url = this.profilePictureURL + val key = this.profilePictureEncryptionKey + return if (url != null && key != null) { + UserPic(url, key) + } else { + null + } + } + + private val Profile.profilePicture: UserPic? + get() { + val url = this.profilePictureURL + val key = this.profileKey + return if (url != null && key != null) { + UserPic(url, key) + } else { + null + } } - } } \ No newline at end of file diff --git a/app/src/main/java/org/thoughtcrime/securesms/repository/ConversationRepository.kt b/app/src/main/java/org/thoughtcrime/securesms/repository/ConversationRepository.kt index f2443057d1..d7baef07ba 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/repository/ConversationRepository.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/repository/ConversationRepository.kt @@ -12,6 +12,7 @@ import kotlinx.coroutines.withContext import network.loki.messenger.libsession_util.util.ExpiryMode import org.session.libsession.database.MessageDataProvider import org.session.libsession.database.userAuth +import org.session.libsession.messaging.groups.GroupManagerV2 import org.session.libsession.messaging.messages.Destination import org.session.libsession.messaging.messages.control.MessageRequestResponse import org.session.libsession.messaging.messages.control.UnsendRequest @@ -26,16 +27,15 @@ import org.session.libsession.utilities.Address import org.session.libsession.utilities.GroupUtil import org.session.libsession.utilities.TextSecurePreferences import org.session.libsession.utilities.recipients.Recipient +import org.session.libsignal.utilities.AccountId import org.session.libsignal.utilities.Log import org.session.libsignal.utilities.toHexString import org.thoughtcrime.securesms.database.DatabaseContentProviders import org.thoughtcrime.securesms.database.DraftDatabase -import org.thoughtcrime.securesms.database.ExpirationConfigurationDatabase import org.thoughtcrime.securesms.database.LokiMessageDatabase import org.thoughtcrime.securesms.database.LokiThreadDatabase import org.thoughtcrime.securesms.database.MmsDatabase import org.thoughtcrime.securesms.database.MmsSmsDatabase -import org.thoughtcrime.securesms.database.RecipientDatabase import org.thoughtcrime.securesms.database.SessionJobDatabase import org.thoughtcrime.securesms.database.SmsDatabase import org.thoughtcrime.securesms.database.Storage @@ -69,7 +69,7 @@ interface ConversationRepository { suspend fun deleteMessageRequest(thread: ThreadRecord): Result suspend fun clearAllMessageRequests(block: Boolean): Result suspend fun acceptMessageRequest(threadId: Long, recipient: Recipient): Result - fun declineMessageRequest(threadId: Long, recipient: Recipient) + suspend fun declineMessageRequest(threadId: Long, recipient: Recipient): Result fun hasReceived(threadId: Long): Boolean fun getInvitingAdmin(threadId: Long): Recipient? } @@ -84,13 +84,12 @@ class DefaultConversationRepository @Inject constructor( private val smsDb: SmsDatabase, private val mmsDb: MmsDatabase, private val mmsSmsDb: MmsSmsDatabase, - private val recipientDb: RecipientDatabase, private val storage: Storage, private val lokiMessageDb: LokiMessageDatabase, private val sessionJobDb: SessionJobDatabase, - private val configDb: ExpirationConfigurationDatabase, private val configFactory: ConfigFactory, private val contentResolver: ContentResolver, + private val groupManager: GroupManagerV2, ) : ConversationRepository { override fun maybeGetRecipientForThreadId(threadId: Long): Recipient? { @@ -329,11 +328,8 @@ class DefaultConversationRepository @Inject constructor( } } - override suspend fun deleteMessageRequest(thread: ThreadRecord) = runCatching { - withContext(Dispatchers.Default) { - declineMessageRequest(thread.threadId, thread.recipient) - } - } + override suspend fun deleteMessageRequest(thread: ThreadRecord) + = declineMessageRequest(thread.threadId, thread.recipient) override suspend fun clearAllMessageRequests(block: Boolean) = runCatching { withContext(Dispatchers.Default) { @@ -353,7 +349,10 @@ class DefaultConversationRepository @Inject constructor( withContext(Dispatchers.Default) { storage.setRecipientApproved(recipient, true) if (recipient.isClosedGroupV2Recipient) { - storage.respondToClosedGroupInvitation(threadId, recipient, true) + groupManager.respondToInvitation( + AccountId(recipient.address.serialize()), + approved = true + ) } else { val message = MessageRequestResponse(true) MessageSender.send( @@ -369,12 +368,17 @@ class DefaultConversationRepository @Inject constructor( } } - override fun declineMessageRequest(threadId: Long, recipient: Recipient) { - sessionJobDb.cancelPendingMessageSendJobs(threadId) - if (recipient.isClosedGroupV2Recipient) { - storage.respondToClosedGroupInvitation(threadId, recipient, false) - } else { - storage.deleteConversation(threadId) + override suspend fun declineMessageRequest(threadId: Long, recipient: Recipient): Result = runCatching { + withContext(Dispatchers.Default) { + sessionJobDb.cancelPendingMessageSendJobs(threadId) + if (recipient.isClosedGroupV2Recipient) { + groupManager.respondToInvitation( + AccountId(recipient.address.serialize()), + approved = false + ) + } else { + storage.deleteConversation(threadId) + } } } diff --git a/app/src/main/java/org/thoughtcrime/securesms/sskenvironment/ProfileManager.kt b/app/src/main/java/org/thoughtcrime/securesms/sskenvironment/ProfileManager.kt index ac873d0a3b..3a6a337a28 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/sskenvironment/ProfileManager.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/sskenvironment/ProfileManager.kt @@ -1,6 +1,7 @@ package org.thoughtcrime.securesms.sskenvironment import android.content.Context +import dagger.hilt.android.qualifiers.ApplicationContext import network.loki.messenger.libsession_util.util.UserPic import org.session.libsession.messaging.contacts.Contact import org.session.libsession.messaging.jobs.JobQueue @@ -13,8 +14,14 @@ import org.session.libsignal.utilities.IdPrefix import org.thoughtcrime.securesms.dependencies.ConfigFactory import org.thoughtcrime.securesms.dependencies.DatabaseComponent import org.thoughtcrime.securesms.util.ConfigurationMessageUtilities +import javax.inject.Inject +import javax.inject.Singleton -class ProfileManager(private val context: Context, private val configFactory: ConfigFactory) : SSKEnvironment.ProfileManagerProtocol { +@Singleton +class ProfileManager @Inject constructor( + @ApplicationContext private val context: Context, + private val configFactory: ConfigFactory +) : SSKEnvironment.ProfileManagerProtocol { override fun setNickname(context: Context, recipient: Recipient, nickname: String?) { if (recipient.isLocalNumber) return diff --git a/app/src/test/java/org/thoughtcrime/securesms/conversation/v2/ConversationViewModelTest.kt b/app/src/test/java/org/thoughtcrime/securesms/conversation/v2/ConversationViewModelTest.kt index b474a2402f..4c3ce0a8ad 100644 --- a/app/src/test/java/org/thoughtcrime/securesms/conversation/v2/ConversationViewModelTest.kt +++ b/app/src/test/java/org/thoughtcrime/securesms/conversation/v2/ConversationViewModelTest.kt @@ -19,7 +19,6 @@ import org.mockito.kotlin.mock import org.mockito.kotlin.whenever import org.session.libsession.utilities.recipients.Recipient import org.thoughtcrime.securesms.BaseViewModelTest -import org.thoughtcrime.securesms.database.MmsDatabase import org.thoughtcrime.securesms.database.Storage import org.thoughtcrime.securesms.database.model.MessageRecord import org.thoughtcrime.securesms.repository.ConversationRepository @@ -42,8 +41,7 @@ class ConversationViewModelTest: BaseViewModelTest() { storage = storage, messageDataProvider = mock(), groupDb = mock(), - threadDb = mock(), - appContext = mock() + threadDb = mock() ) } diff --git a/libsession/src/main/java/org/session/libsession/database/StorageProtocol.kt b/libsession/src/main/java/org/session/libsession/database/StorageProtocol.kt index 223f892e13..c1501d888a 100644 --- a/libsession/src/main/java/org/session/libsession/database/StorageProtocol.kt +++ b/libsession/src/main/java/org/session/libsession/database/StorageProtocol.kt @@ -170,19 +170,14 @@ interface StorageProtocol { fun updateTimestampUpdated(groupID: String, updatedTimestamp: Long) // Closed Groups - fun createNewGroup(groupName: String, groupDescription: String, members: Set): Optional fun getMembers(groupPublicKey: String): List - fun respondToClosedGroupInvitation(threadId: Long, groupRecipient: Recipient, approved: Boolean) - fun addClosedGroupInvite(groupId: AccountId, name: String, authData: ByteArray?, adminKey: ByteArray?, invitingAdmin: AccountId, invitingMessageHash: String?) - fun setGroupInviteCompleteIfNeeded(approved: Boolean, invitee: String, closedGroup: AccountId) fun getLibSessionClosedGroup(groupAccountId: String): GroupInfo.ClosedGroupInfo? fun getClosedGroupDisplayInfo(groupAccountId: String): GroupDisplayInfo? fun insertGroupInfoChange(message: GroupUpdated, closedGroup: AccountId): Long? fun insertGroupInfoLeaving(closedGroup: AccountId): Long? + fun insertGroupInviteControlMessage(sentTimestamp: Long, senderPublicKey: String, closedGroup: AccountId, groupName: String): Long? fun updateGroupInfoChange(messageId: Long, newType: UpdateMessageData.Kind) fun handleMemberLeftNotification(message: GroupUpdated, closedGroupId: AccountId) - fun handleKicked(groupAccountId: AccountId) - fun setName(groupSessionId: String, newName: String) fun sendGroupUpdateDeleteMessage(groupSessionId: String, messageHashes: List): Promise // Groups diff --git a/libsession/src/main/java/org/session/libsession/messaging/groups/GroupManagerV2.kt b/libsession/src/main/java/org/session/libsession/messaging/groups/GroupManagerV2.kt index 8eeccce3f3..8a31021ae9 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/groups/GroupManagerV2.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/groups/GroupManagerV2.kt @@ -1,6 +1,8 @@ package org.session.libsession.messaging.groups +import org.session.libsession.messaging.contacts.Contact import org.session.libsession.messaging.messages.control.GroupUpdated +import org.session.libsession.utilities.recipients.Recipient import org.session.libsignal.utilities.AccountId /** @@ -8,6 +10,12 @@ import org.session.libsignal.utilities.AccountId * removing members, promoting members, leaving groups, etc. */ interface GroupManagerV2 { + suspend fun createGroup( + groupName: String, + groupDescription: String, + members: Set + ): Recipient + suspend fun inviteMembers( group: AccountId, newMembers: List, @@ -25,4 +33,28 @@ interface GroupManagerV2 { suspend fun leaveGroup(group: AccountId, deleteOnLeave: Boolean) suspend fun promoteMember(group: AccountId, members: List) + + suspend fun onReceiveInvitation( + groupId: AccountId, + groupName: String, + authData: ByteArray, + inviter: AccountId, + inviteMessageHash: String? + ) + + suspend fun onReceivePromotion( + groupId: AccountId, + groupName: String, + adminKey: ByteArray, + promoter: AccountId, + promoteMessageHash: String? + ) + + suspend fun respondToInvitation(groupId: AccountId, approved: Boolean): Unit? + + suspend fun handleInviteResponse(groupId: AccountId, sender: AccountId, approved: Boolean) + + suspend fun handleKicked(groupId: AccountId) + + suspend fun setName(groupId: AccountId, newName: String) } \ No newline at end of file diff --git a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/ReceivedMessageHandler.kt b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/ReceivedMessageHandler.kt index f0370aa4cc..3082d42d47 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/ReceivedMessageHandler.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/ReceivedMessageHandler.kt @@ -43,7 +43,6 @@ import org.session.libsession.messaging.utilities.SodiumUtilities import org.session.libsession.messaging.utilities.WebRtcUtils import org.session.libsession.snode.SnodeAPI import org.session.libsession.utilities.Address -import org.session.libsession.utilities.Address.Companion.fromSerialized import org.session.libsession.utilities.GroupRecord import org.session.libsession.utilities.GroupUtil import org.session.libsession.utilities.GroupUtil.doubleEncodeGroupID @@ -173,7 +172,7 @@ private fun MessageReceiver.handleExpirationTimerUpdate(message: ExpirationTimer val module = MessagingModuleConfiguration.shared try { - val threadId = fromSerialized(message.groupPublicKey?.let(::doubleEncodeGroupID) ?: message.sender!!) + val threadId = Address.fromSerialized(message.groupPublicKey?.let(::doubleEncodeGroupID) ?: message.sender!!) .let(module.storage::getOrCreateThreadIdFor) module.storage.setExpirationConfiguration( @@ -363,11 +362,18 @@ fun MessageReceiver.handleVisibleMessage( } // Handle group invite response if new closed group if (threadRecipient?.isClosedGroupV2Recipient == true) { - storage.setGroupInviteCompleteIfNeeded( - approved = true, - recipient.address.serialize(), - AccountId(threadRecipient.address.serialize()) - ) + GlobalScope.launch { + try { + MessagingModuleConfiguration.shared.groupManagerV2 + .handleInviteResponse( + AccountId(threadRecipient.address.serialize()), + AccountId(messageSender), + approved = true + ) + } catch (e: Exception) { + Log.e("Loki", "Failed to handle invite response", e) + } + } } // Parse quote if needed var quoteModel: QuoteModel? = null @@ -659,20 +665,25 @@ private fun handleGroupInfoChange(message: GroupUpdated, closedGroup: AccountId) } private fun handlePromotionMessage(message: GroupUpdated) { - val storage = MessagingModuleConfiguration.shared.storage val promotion = message.inner.promoteMessage val seed = promotion.groupIdentitySeed.toByteArray() val keyPair = Sodium.ed25519KeyPair(seed) val sender = message.sender!! val adminId = AccountId(sender) - storage.addClosedGroupInvite( - groupId = AccountId(IdPrefix.GROUP, keyPair.pubKey), - name = promotion.name, - authData = null, - adminKey = keyPair.secretKey, - invitingAdmin = adminId, - message.serverHash - ) + GlobalScope.launch { + try { + MessagingModuleConfiguration.shared.groupManagerV2 + .onReceivePromotion( + groupId = AccountId(IdPrefix.GROUP, keyPair.pubKey), + groupName = promotion.name, + adminKey = keyPair.secretKey, + promoter = adminId, + promoteMessageHash = message.serverHash + ) + } catch (e: Exception) { + Log.e("GroupUpdated", "Failed to handle promotion message", e) + } + } } private fun MessageReceiver.handleInviteResponse(message: GroupUpdated, closedGroup: AccountId) { @@ -680,7 +691,13 @@ private fun MessageReceiver.handleInviteResponse(message: GroupUpdated, closedGr // val profile = message // maybe we do need data to be the inner so we can access profile val storage = MessagingModuleConfiguration.shared.storage val approved = message.inner.inviteResponse.isApproved - storage.setGroupInviteCompleteIfNeeded(approved, sender, closedGroup) + GlobalScope.launch { + try { + MessagingModuleConfiguration.shared.groupManagerV2.handleInviteResponse(closedGroup, AccountId(sender), approved) + } catch (e: Exception) { + Log.e("GroupUpdated", "Failed to handle invite response", e) + } + } } private fun MessageReceiver.handleNewLibSessionClosedGroupMessage(message: GroupUpdated) { @@ -696,15 +713,20 @@ private fun MessageReceiver.handleNewLibSessionClosedGroupMessage(message: Group val sender = message.sender!! val adminId = AccountId(sender) - // add the group - storage.addClosedGroupInvite( - groupId, - invite.name, - invite.memberAuthData.toByteArray(), - null, - adminId, - message.serverHash - ) + GlobalScope.launch { + try { + MessagingModuleConfiguration.shared.groupManagerV2 + .onReceiveInvitation( + groupId = groupId, + groupName = invite.name, + authData = invite.memberAuthData.toByteArray(), + inviter = adminId, + inviteMessageHash = message.serverHash + ) + } catch (e: Exception) { + Log.e("GroupUpdated", "Failed to handle invite message", e) + } + } } /** @@ -768,7 +790,7 @@ private fun handleNewClosedGroup(sender: String, sentTimestamp: Long, groupPubli storage.updateTitle(groupID, name) storage.updateMembers(groupID, members.map { Address.fromSerialized(it) }) } else { - storage.createGroup(groupID, name, LinkedList(members.map { fromSerialized(it) }), + storage.createGroup(groupID, name, LinkedList(members.map { Address.fromSerialized(it) }), null, null, LinkedList(admins.map { Address.fromSerialized(it) }), formationTimestamp) } storage.setProfileSharing(Address.fromSerialized(groupID), true) diff --git a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/ClosedGroupPoller.kt b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/ClosedGroupPoller.kt index c8f4079046..be745f66dd 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/ClosedGroupPoller.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/ClosedGroupPoller.kt @@ -2,6 +2,7 @@ package org.session.libsession.messaging.sending_receiving.pollers import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.Job import kotlinx.coroutines.delay import kotlinx.coroutines.isActive @@ -13,6 +14,7 @@ import network.loki.messenger.libsession_util.util.GroupInfo import network.loki.messenger.libsession_util.util.Sodium import org.session.libsession.database.StorageProtocol import org.session.libsession.messaging.MessagingModuleConfiguration +import org.session.libsession.messaging.groups.GroupManagerV2 import org.session.libsession.messaging.jobs.BatchMessageReceiveJob import org.session.libsession.messaging.jobs.JobQueue import org.session.libsession.messaging.jobs.MessageReceiveParameters @@ -38,7 +40,7 @@ class ClosedGroupPoller( private val executor: CoroutineDispatcher, private val closedGroupSessionId: AccountId, private val configFactoryProtocol: ConfigFactoryProtocol, - private val storageProtocol: StorageProtocol = MessagingModuleConfiguration.shared.storage) { + private val groupManagerV2: GroupManagerV2) { data class ParsedRawMessage( val data: ByteArray, @@ -268,40 +270,14 @@ class ClosedGroupPoller( if (Sodium.KICKED_REGEX.matches(message)) { val (sessionId, generation) = message.split("-") if (sessionId == userSessionId.hexString && generation.toInt() >= keys.currentGeneration()) { - Log.d("GroupPoller", "We were kicked from the group, delete and stop polling") - stop() - - configFactoryProtocol.userGroups?.let { userGroups -> - userGroups.getClosedGroup(closedGroupSessionId.hexString)?.let { group -> - // Retrieve the group name one last time from the group info, - // as we are going to clear the keys, we won't have the chance to - // read the group name anymore. - val groupName = configFactoryProtocol.getGroupInfoConfig(closedGroupSessionId) - ?.use { it.getName() } - ?: group.name - - userGroups.set(group.copy( - authData = null, - adminKey = null, - name = groupName - )) - - configFactoryProtocol.persist(userGroups, SnodeAPI.nowWithOffset) + GlobalScope.launch { + try { + groupManagerV2.handleKicked(closedGroupSessionId) + } catch (e: Exception) { + Log.e("GroupPoller", "Error handling kicked message: $e") } } - storageProtocol.handleKicked(closedGroupSessionId) - - MessagingModuleConfiguration.shared.storage.insertIncomingInfoMessage( - context = MessagingModuleConfiguration.shared.context, - senderPublicKey = userSessionId.hexString, - groupID = closedGroupSessionId.hexString, - type = SignalServiceGroup.Type.KICKED, - name = "", - members = emptyList(), - admins = emptyList(), - sentTimestamp = SnodeAPI.nowWithOffset, - ) } } } diff --git a/libsession/src/main/java/org/session/libsession/snode/SnodeAPI.kt b/libsession/src/main/java/org/session/libsession/snode/SnodeAPI.kt index 8d6749c9a3..46b4373a33 100644 --- a/libsession/src/main/java/org/session/libsession/snode/SnodeAPI.kt +++ b/libsession/src/main/java/org/session/libsession/snode/SnodeAPI.kt @@ -2,6 +2,8 @@ package org.session.libsession.snode +import android.os.SystemClock +import com.fasterxml.jackson.databind.JsonNode import com.goterl.lazysodium.exceptions.SodiumException import com.goterl.lazysodium.interfaces.GenericHash import com.goterl.lazysodium.interfaces.PwHash @@ -9,8 +11,18 @@ import com.goterl.lazysodium.interfaces.SecretBox import com.goterl.lazysodium.utils.Key import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.async +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.channels.SendChannel +import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.scan +import kotlinx.coroutines.launch +import kotlinx.coroutines.selects.onTimeout +import kotlinx.coroutines.selects.select import kotlinx.coroutines.withContext +import kotlinx.coroutines.withTimeoutOrNull import nl.komponents.kovenant.Promise import nl.komponents.kovenant.all import nl.komponents.kovenant.functional.bind @@ -43,6 +55,8 @@ import kotlin.collections.component1 import kotlin.collections.component2 import kotlin.collections.set import kotlin.properties.Delegates.observable +import kotlin.time.Duration +import kotlin.time.Duration.Companion.milliseconds object SnodeAPI { internal val database: LokiAPIDatabaseProtocol @@ -115,7 +129,7 @@ object SnodeAPI { val method: String, val params: Map, @Transient - val namespace: Int? + val namespace: Int?, ) // assume signatures, pubkey and namespaces are attached in parameters if required // Internal API @@ -571,6 +585,99 @@ object SnodeAPI { } } + private data class RequestInfo( + val accountId: AccountId, + val request: SnodeBatchRequestInfo, + val responseType: Class<*>, + val callback: SendChannel>, + val requestTime: Long = SystemClock.uptimeMillis(), + ) + + private val batchedRequestsSender: SendChannel + + init { + val batchRequests = Channel() + batchedRequestsSender = batchRequests + + val batchWindowMills = 100L + + @Suppress("OPT_IN_USAGE") + GlobalScope.launch { + val batches = hashMapOf>() + + while (true) { + val batch = select?> { + // If we receive a request, add it to the batch + batchRequests.onReceive { + batches.getOrPut(it.accountId) { mutableListOf() }.add(it) + null + } + + // If we have anything in the batch, look for the one that is about to expire + // and wait for it to expire, remove it from the batches and send it for + // processing. + if (batches.isNotEmpty()) { + val earliestBatch = batches.minBy { it.value.first().requestTime } + val deadline = earliestBatch.value.first().requestTime + batchWindowMills + onTimeout( + timeMillis = (deadline - SystemClock.uptimeMillis()).coerceAtLeast(0) + ) { + batches.remove(earliestBatch.key) + } + } + } + + if (batch != null) { + launch { + val accountId = batch.first().accountId + val responses = try { + getBatchResponse( + snode = getSingleTargetSnode(accountId.hexString).await(), + publicKey = accountId.hexString, + requests = batch.map { it.request }, sequence = false + ) + } catch (e: Exception) { + for (req in batch) { + req.callback.send(Result.failure(e)) + } + return@launch + } + + for ((req, resp) in batch.zip(responses.results)) { + req.callback.send(kotlin.runCatching { + JsonUtil.fromJson(resp.body, req.responseType) + }) + } + + // Close all channels in the requests just in case we don't have paired up + // responses. + for (req in batch) { + req.callback.close() + } + } + } + } + } + } + + suspend fun sendBatchRequest( + swarmAccount: AccountId, + request: SnodeBatchRequestInfo, + responseType: Class, + ): T { + val callback = Channel>() + @Suppress("UNCHECKED_CAST") + batchedRequestsSender.send(RequestInfo(swarmAccount, request, responseType, callback as SendChannel)) + return callback.receive().getOrThrow() + } + + suspend fun sendBatchRequest( + swarmAccount: AccountId, + request: SnodeBatchRequestInfo, + ): JsonNode { + return sendBatchRequest(swarmAccount, request, JsonNode::class.java) + } + suspend fun getBatchResponse( snode: Snode, publicKey: String, @@ -697,8 +804,15 @@ object SnodeAPI { return scope.retrySuspendAsPromise(maxRetryCount) { val destination = message.recipient - val snode = getSingleTargetSnode(destination).await() - invoke(Snode.Method.SendMessage, snode, params, destination).await() + sendBatchRequest( + swarmAccount = AccountId(destination), + request = SnodeBatchRequestInfo( + method = Snode.Method.SendMessage.rawValue, + params = params, + namespace = namespace + ), + responseType = Map::class.java + ) } } diff --git a/libsession/src/main/java/org/session/libsession/snode/model/BatchResponse.kt b/libsession/src/main/java/org/session/libsession/snode/model/BatchResponse.kt index f0e9344a62..fd022b5898 100644 --- a/libsession/src/main/java/org/session/libsession/snode/model/BatchResponse.kt +++ b/libsession/src/main/java/org/session/libsession/snode/model/BatchResponse.kt @@ -2,12 +2,13 @@ package org.session.libsession.snode.model import com.fasterxml.jackson.annotation.JsonCreator import com.fasterxml.jackson.annotation.JsonProperty +import com.fasterxml.jackson.databind.JsonNode data class BatchResponse @JsonCreator constructor( @param:JsonProperty("results") val results: List, ) { data class Item @JsonCreator constructor( @param:JsonProperty("code") val code: Int, - @param:JsonProperty("body") val body: Map?, + @param:JsonProperty("body") val body: JsonNode, ) } diff --git a/libsession/src/main/java/org/session/libsession/snode/model/StoreMessageResponse.kt b/libsession/src/main/java/org/session/libsession/snode/model/StoreMessageResponse.kt new file mode 100644 index 0000000000..6cd05fbbc8 --- /dev/null +++ b/libsession/src/main/java/org/session/libsession/snode/model/StoreMessageResponse.kt @@ -0,0 +1,9 @@ +package org.session.libsession.snode.model + +import com.fasterxml.jackson.annotation.JsonCreator +import com.fasterxml.jackson.annotation.JsonProperty + +data class StoreMessageResponse @JsonCreator constructor( + @JsonProperty("hash") val hash: String, + @JsonProperty("t") val timestamp: Long, +) diff --git a/libsignal/src/main/java/org/session/libsignal/utilities/JsonUtil.java b/libsignal/src/main/java/org/session/libsignal/utilities/JsonUtil.java index 6625c91260..1ed3ec67f3 100644 --- a/libsignal/src/main/java/org/session/libsignal/utilities/JsonUtil.java +++ b/libsignal/src/main/java/org/session/libsignal/utilities/JsonUtil.java @@ -51,6 +51,10 @@ public class JsonUtil { return objectMapper.readValue(serialized, clazz); } + public static T fromJson(JsonNode serialized, Class clazz) throws IOException { + return objectMapper.treeToValue(serialized, clazz); + } + public static JsonNode fromJson(String serialized) throws IOException { return objectMapper.readTree(serialized); }