diff --git a/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java b/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java index 2f8bd1096f..a57fa7dbc3 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java +++ b/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java @@ -39,18 +39,17 @@ import androidx.lifecycle.ProcessLifecycleOwner; import com.squareup.phrase.Phrase; import org.conscrypt.Conscrypt; -import org.jetbrains.annotations.NotNull; -import org.session.libsession.avatars.AvatarHelper; import org.session.libsession.database.MessageDataProvider; import org.session.libsession.messaging.MessagingModuleConfiguration; +import org.session.libsession.messaging.configs.ConfigSyncHandler; import org.session.libsession.messaging.groups.GroupManagerV2; +import org.session.libsession.messaging.groups.RemoveGroupMemberHandler; import org.session.libsession.messaging.notifications.TokenFetcher; import org.session.libsession.messaging.sending_receiving.notifications.MessageNotifier; import org.session.libsession.messaging.sending_receiving.pollers.LegacyClosedGroupPollerV2; import org.session.libsession.messaging.sending_receiving.pollers.Poller; import org.session.libsession.snode.SnodeModule; import org.session.libsession.utilities.Address; -import org.session.libsession.utilities.ConfigFactoryUpdateListener; import org.session.libsession.utilities.Device; import org.session.libsession.utilities.Environment; import org.session.libsession.utilities.ProfilePictureUtilities; @@ -94,7 +93,6 @@ import org.thoughtcrime.securesms.notifications.OptimizedMessageNotifier; import org.thoughtcrime.securesms.providers.BlobProvider; import org.thoughtcrime.securesms.service.ExpiringMessageManager; import org.thoughtcrime.securesms.service.KeyCachingService; -import org.thoughtcrime.securesms.sskenvironment.ProfileManager; import org.thoughtcrime.securesms.sskenvironment.ReadReceiptManager; import org.thoughtcrime.securesms.sskenvironment.TypingStatusRepository; import org.thoughtcrime.securesms.util.Broadcaster; @@ -104,12 +102,10 @@ import org.thoughtcrime.securesms.webrtc.CallMessageProcessor; import org.webrtc.PeerConnectionFactory; import org.webrtc.PeerConnectionFactory.InitializationOptions; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.security.Security; import java.util.Arrays; -import java.util.Date; import java.util.List; import java.util.Map; import java.util.Timer; @@ -119,11 +115,8 @@ import javax.inject.Inject; import dagger.hilt.EntryPoints; import dagger.hilt.android.HiltAndroidApp; -import kotlin.Unit; import network.loki.messenger.BuildConfig; import network.loki.messenger.R; -import network.loki.messenger.libsession_util.Config; -import network.loki.messenger.libsession_util.UserProfile; /** * Will be called once when the TextSecure process is created. @@ -169,6 +162,8 @@ public class ApplicationContext extends Application implements DefaultLifecycleO @Inject SSKEnvironment.ProfileManagerProtocol profileManager; CallMessageProcessor callMessageProcessor; MessagingModuleConfiguration messagingModuleConfiguration; + @Inject ConfigSyncHandler configSyncHandler; + @Inject RemoveGroupMemberHandler removeGroupMemberHandler; private volatile boolean isAppVisible; @@ -272,6 +267,8 @@ public class ApplicationContext extends Application implements DefaultLifecycleO HTTP.INSTANCE.setConnectedToNetwork(networkConstraint::isMet); pushRegistrationHandler.run(); + configSyncHandler.start(); + removeGroupMemberHandler.start(); // add our shortcut debug menu if we are not in a release build if (BuildConfig.BUILD_TYPE != "release") { @@ -355,6 +352,10 @@ public class ApplicationContext extends Application implements DefaultLifecycleO return typingStatusSender; } + public TextSecurePreferences getTextSecurePreferences() { + return textSecurePreferences; + } + public ReadReceiptManager getReadReceiptManager() { return readReceiptManager; } @@ -444,13 +445,9 @@ public class ApplicationContext extends Application implements DefaultLifecycleO private static class ProviderInitializationException extends RuntimeException { } private void setUpPollingIfNeeded() { - String userPublicKey = TextSecurePreferences.getLocalNumber(this); + String userPublicKey = textSecurePreferences.getLocalNumber(); if (userPublicKey == null) return; - if (poller != null) { - poller.setUserPublicKey(userPublicKey); - return; - } - poller = new Poller(configFactory); + poller = new Poller(configFactory, storage, lokiAPIDatabase); } public void startPollingIfNeeded() { diff --git a/app/src/main/java/org/thoughtcrime/securesms/dependencies/AppModule.kt b/app/src/main/java/org/thoughtcrime/securesms/dependencies/AppModule.kt index a2c2bacdce..597b8e5f20 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/dependencies/AppModule.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/dependencies/AppModule.kt @@ -11,9 +11,12 @@ import dagger.hilt.android.qualifiers.ApplicationContext import dagger.hilt.components.SingletonComponent import org.session.libsession.messaging.groups.GroupManagerV2 import org.session.libsession.utilities.AppTextSecurePreferences +import org.session.libsession.utilities.ConfigFactoryProtocol import org.session.libsession.utilities.SSKEnvironment import org.session.libsession.utilities.TextSecurePreferences import org.session.libsession.utilities.Toaster +import org.session.libsignal.database.LokiAPIDatabaseProtocol +import org.thoughtcrime.securesms.database.LokiAPIDatabase import org.thoughtcrime.securesms.groups.GroupManagerV2Impl import org.thoughtcrime.securesms.repository.ConversationRepository import org.thoughtcrime.securesms.repository.DefaultConversationRepository @@ -35,6 +38,12 @@ abstract class AppModule { @Binds abstract fun bindProfileManager(profileManager: ProfileManager): SSKEnvironment.ProfileManagerProtocol + + @Binds + abstract fun bindConfigFactory(configFactory: ConfigFactory): ConfigFactoryProtocol + + @Binds + abstract fun bindLokiAPIDatabaseProtocol(lokiAPIDatabase: LokiAPIDatabase): LokiAPIDatabaseProtocol } @Module diff --git a/app/src/main/java/org/thoughtcrime/securesms/dependencies/ConfigFactory.kt b/app/src/main/java/org/thoughtcrime/securesms/dependencies/ConfigFactory.kt index ab8b2dd2bd..f8bec9bccb 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/dependencies/ConfigFactory.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/dependencies/ConfigFactory.kt @@ -1,6 +1,8 @@ package org.thoughtcrime.securesms.dependencies import android.content.Context +import dagger.Lazy +import dagger.hilt.android.qualifiers.ApplicationContext import kotlinx.coroutines.channels.BufferOverflow import kotlinx.coroutines.flow.MutableSharedFlow import network.loki.messenger.libsession_util.ConfigBase @@ -16,6 +18,7 @@ import network.loki.messenger.libsession_util.MutableUserProfile import network.loki.messenger.libsession_util.UserGroupsConfig import network.loki.messenger.libsession_util.UserProfile import network.loki.messenger.libsession_util.util.BaseCommunityInfo +import network.loki.messenger.libsession_util.util.ConfigPush import network.loki.messenger.libsession_util.util.Contact import network.loki.messenger.libsession_util.util.ExpiryMode import network.loki.messenger.libsession_util.util.GroupInfo @@ -28,29 +31,39 @@ import org.session.libsession.snode.SnodeAPI import org.session.libsession.snode.SwarmAuth import org.session.libsession.utilities.Address import org.session.libsession.utilities.ConfigFactoryProtocol +import org.session.libsession.utilities.ConfigMessage +import org.session.libsession.utilities.ConfigPushResult import org.session.libsession.utilities.ConfigUpdateNotification import org.session.libsession.utilities.GroupConfigs import org.session.libsession.utilities.GroupUtil import org.session.libsession.utilities.MutableGroupConfigs import org.session.libsession.utilities.MutableUserConfigs +import org.session.libsession.utilities.TextSecurePreferences +import org.session.libsession.utilities.UserConfigType import org.session.libsession.utilities.UserConfigs import org.session.libsignal.crypto.ecc.DjbECPublicKey import org.session.libsignal.utilities.AccountId import org.session.libsignal.utilities.Hex import org.session.libsignal.utilities.IdPrefix +import org.session.libsignal.utilities.Log import org.session.libsignal.utilities.toHexString import org.thoughtcrime.securesms.database.ConfigDatabase +import org.thoughtcrime.securesms.database.LokiThreadDatabase import org.thoughtcrime.securesms.database.ThreadDatabase -import org.thoughtcrime.securesms.dependencies.DatabaseComponent.Companion.get import org.thoughtcrime.securesms.groups.GroupManager import java.util.concurrent.ConcurrentHashMap +import javax.inject.Inject +import javax.inject.Singleton -class ConfigFactory( - private val context: Context, +@Singleton +class ConfigFactory @Inject constructor( + @ApplicationContext private val context: Context, private val configDatabase: ConfigDatabase, private val threadDb: ThreadDatabase, - private val storage: StorageProtocol, + private val lokiThreadDatabase: LokiThreadDatabase, + private val storage: Lazy, + private val textSecurePreferences: TextSecurePreferences ) : ConfigFactoryProtocol { companion object { // This is a buffer period within which we will process messages which would result in a @@ -182,7 +195,7 @@ class ConfigFactory( members = groupMembers ) - fun persistIfDirty(): Boolean { + fun dumpIfNeeded(): Boolean { if (groupInfo.needsDump() || groupMembers.needsDump() || groupKeys.needsDump()) { configDatabase.storeGroupConfigs( publicKey = groupAccountId.hexString, @@ -197,11 +210,10 @@ class ConfigFactory( return false } - override fun loadKeys(message: ByteArray, hash: String, timestamp: Long): Boolean { - return groupKeys.loadKey(message, hash, timestamp, groupInfo.pointer, groupMembers.pointer) - } + val isDirty: Boolean + get() = groupInfo.dirty() || groupMembers.dirty() - override fun rekeys() { + override fun rekey() { groupKeys.rekey(groupInfo.pointer, groupMembers.pointer) } } @@ -211,17 +223,17 @@ class ConfigFactory( private val _configUpdateNotifications = MutableSharedFlow( extraBufferCapacity = 1, - onBufferOverflow = BufferOverflow.DROP_OLDEST + onBufferOverflow = BufferOverflow.SUSPEND ) override val configUpdateNotifications get() = _configUpdateNotifications private fun requiresCurrentUserAccountId(): AccountId = - AccountId(requireNotNull(storage.getUserPublicKey()) { + AccountId(requireNotNull(textSecurePreferences.getLocalNumber()) { "No logged in user" }) private fun requiresCurrentUserED25519SecKey(): ByteArray = - requireNotNull(storage.getUserED25519KeyPair()?.secretKey?.asBytes) { + requireNotNull(storage.get().getUserED25519KeyPair()?.secretKey?.asBytes) { "No logged in user" } @@ -233,7 +245,7 @@ class ConfigFactory( userAccountId, threadDb = threadDb, configDatabase = configDatabase, - storage = storage + storage = storage.get() ) } @@ -242,11 +254,16 @@ class ConfigFactory( } } - override fun withMutableUserConfigs(cb: (MutableUserConfigs) -> T): T { + /** + * Perform an operation on the user configs, and notify listeners if the configs were changed. + * + * @param cb A function that takes a [UserConfigsImpl] and returns a pair of the result of the operation and a boolean indicating if the configs were changed. + */ + private fun doWithMutableUserConfigs(cb: (UserConfigsImpl) -> Pair): T { return withUserConfigs { configs -> - val result = cb(configs as UserConfigsImpl) + val (result, changed) = cb(configs as UserConfigsImpl) - if (configs.persistIfDirty()) { + if (changed) { _configUpdateNotifications.tryEmit(ConfigUpdateNotification.UserConfigs) } @@ -254,6 +271,32 @@ class ConfigFactory( } } + override fun mergeUserConfigs( + userConfigType: UserConfigType, + messages: List + ) { + if (messages.isEmpty()) { + return + } + + return doWithMutableUserConfigs { configs -> + val config = when (userConfigType) { + UserConfigType.CONTACTS -> configs.contacts + UserConfigType.USER_PROFILE -> configs.userProfile + UserConfigType.CONVO_INFO_VOLATILE -> configs.convoInfoVolatile + UserConfigType.USER_GROUPS -> configs.userGroups + } + + Unit to config.merge(messages.map { it.hash to it.data }.toTypedArray()).isNotEmpty() + } + } + + override fun withMutableUserConfigs(cb: (MutableUserConfigs) -> T): T { + return doWithMutableUserConfigs { + cb(it) to it.persistIfDirty() + } + } + override fun withGroupConfigs(groupId: AccountId, cb: (GroupConfigs) -> T): T { val configs = groupConfigs.getOrPut(groupId) { val groupAdminKey = requireNotNull(withUserConfigs { @@ -275,18 +318,28 @@ class ConfigFactory( } } + private fun doWithMutableGroupConfigs(groupId: AccountId, cb: (GroupConfigsImpl) -> Pair): T { + return withGroupConfigs(groupId) { configs -> + val (result, changed) = cb(configs as GroupConfigsImpl) + + Log.d("ConfigFactory", "Group updated? $groupId: $changed") + + if (changed) { + if (!_configUpdateNotifications.tryEmit(ConfigUpdateNotification.GroupConfigsUpdated(groupId))) { + Log.e("ConfigFactory", "Unable to deliver group update notification") + } + } + + result + } + } + override fun withMutableGroupConfigs( groupId: AccountId, cb: (MutableGroupConfigs) -> T ): T { - return withGroupConfigs(groupId) { configs -> - val result = cb(configs as GroupConfigsImpl) - - if (configs.persistIfDirty()) { - _configUpdateNotifications.tryEmit(ConfigUpdateNotification.GroupConfigsUpdated(groupId)) - } - - result + return doWithMutableGroupConfigs(groupId) { + cb(it) to it.dumpIfNeeded() } } @@ -309,7 +362,7 @@ class ConfigFactory( ): ByteArray? { return Sodium.decryptForMultipleSimple( encoded = encoded, - ed25519SecretKey = requireNotNull(storage.getUserED25519KeyPair()?.secretKey?.asBytes) { + ed25519SecretKey = requireNotNull(storage.get().getUserED25519KeyPair()?.secretKey?.asBytes) { "No logged in user" }, domain = domain, @@ -317,18 +370,85 @@ class ConfigFactory( ) } + override fun mergeGroupConfigMessages( + groupId: AccountId, + keys: List, + info: List, + members: List + ) { + doWithMutableGroupConfigs(groupId) { configs -> + // Keys must be loaded first as they are used to decrypt the other config messages + val keysLoaded = keys.fold(false) { acc, msg -> + configs.groupKeys.loadKey(msg.data, msg.hash, msg.timestamp, configs.groupInfo.pointer, configs.groupMembers.pointer) || acc + } + + val infoMerged = info.isNotEmpty() && + configs.groupInfo.merge(info.map { it.hash to it.data }.toTypedArray()).isNotEmpty() + + val membersMerged = members.isNotEmpty() && + configs.groupMembers.merge(members.map { it.hash to it.data }.toTypedArray()).isNotEmpty() + + configs.dumpIfNeeded() + + Unit to (keysLoaded || infoMerged || membersMerged) + } + } + + override fun confirmUserConfigsPushed( + contacts: Pair?, + userProfile: Pair?, + convoInfoVolatile: Pair?, + userGroups: Pair? + ) { + if (contacts == null && userProfile == null && convoInfoVolatile == null && userGroups == null) { + return + } + + doWithMutableUserConfigs { configs -> + contacts?.let { (push, result) -> configs.contacts.confirmPushed(push.seqNo, result.hash) } + userProfile?.let { (push, result) -> configs.userProfile.confirmPushed(push.seqNo, result.hash) } + convoInfoVolatile?.let { (push, result) -> configs.convoInfoVolatile.confirmPushed(push.seqNo, result.hash) } + userGroups?.let { (push, result) -> configs.userGroups.confirmPushed(push.seqNo, result.hash) } + + Unit to configs.persistIfDirty() + } + } + + override fun confirmGroupConfigsPushed( + groupId: AccountId, + members: Pair?, + info: Pair?, + keysPush: ConfigPushResult? + ) { + if (members == null && info == null && keysPush == null) { + return + } + + doWithMutableGroupConfigs(groupId) { configs -> + members?.let { (push, result) -> configs.groupMembers.confirmPushed(push.seqNo, result.hash) } + info?.let { (push, result) -> configs.groupInfo.confirmPushed(push.seqNo, result.hash) } + keysPush?.let { (hash, timestamp) -> + val pendingConfig = configs.groupKeys.pendingConfig() + if (pendingConfig != null) { + configs.groupKeys.loadKey(pendingConfig, hash, timestamp, configs.groupInfo.pointer, configs.groupMembers.pointer) + } + } + + Unit to configs.dumpIfNeeded() + } + } + override fun conversationInConfig( publicKey: String?, groupPublicKey: String?, openGroupId: String?, visibleOnly: Boolean ): Boolean { - val userPublicKey = storage.getUserPublicKey() ?: return false + val userPublicKey = storage.get().getUserPublicKey() ?: return false if (openGroupId != null) { val threadId = GroupManager.getOpenGroupThreadID(openGroupId, context) - val openGroup = - get(context).lokiThreadDatabase().getOpenGroupChat(threadId) ?: return false + val openGroup = lokiThreadDatabase.getOpenGroupChat(threadId) ?: return false // Not handling the `hidden` behaviour for communities so just indicate the existence return withUserConfigs { diff --git a/app/src/main/java/org/thoughtcrime/securesms/dependencies/PollerFactory.kt b/app/src/main/java/org/thoughtcrime/securesms/dependencies/PollerFactory.kt index c5e420afab..11ed27dddc 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/dependencies/PollerFactory.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/dependencies/PollerFactory.kt @@ -7,6 +7,7 @@ import network.loki.messenger.libsession_util.util.GroupInfo import org.session.libsession.database.StorageProtocol import org.session.libsession.messaging.groups.GroupManagerV2 import org.session.libsession.messaging.sending_receiving.pollers.ClosedGroupPoller +import org.session.libsignal.database.LokiAPIDatabaseProtocol import org.session.libsignal.utilities.AccountId import java.util.concurrent.ConcurrentHashMap @@ -15,7 +16,8 @@ class PollerFactory( private val executor: CoroutineDispatcher, private val configFactory: ConfigFactory, private val groupManagerV2: Lazy, - private val storage: StorageProtocol, + private val storage: Lazy, + private val lokiApiDatabase: LokiAPIDatabaseProtocol, ) { private val pollers = ConcurrentHashMap() @@ -29,7 +31,15 @@ class PollerFactory( if (invited != false) return null return pollers.getOrPut(sessionId) { - ClosedGroupPoller(scope, executor, sessionId, configFactory, groupManagerV2.get(), storage) + ClosedGroupPoller( + scope = scope, + executor = executor, + closedGroupSessionId = sessionId, + configFactoryProtocol = configFactory, + groupManagerV2 = groupManagerV2.get(), + storage = storage.get(), + lokiApiDatabase = lokiApiDatabase, + ) } } diff --git a/app/src/main/java/org/thoughtcrime/securesms/dependencies/SessionUtilModule.kt b/app/src/main/java/org/thoughtcrime/securesms/dependencies/SessionUtilModule.kt index d040f87b04..ea006b7196 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/dependencies/SessionUtilModule.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/dependencies/SessionUtilModule.kt @@ -14,6 +14,8 @@ import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.GlobalScope import org.session.libsession.database.StorageProtocol import org.session.libsession.messaging.groups.GroupManagerV2 +import org.session.libsession.utilities.ConfigFactoryProtocol +import org.session.libsignal.database.LokiAPIDatabaseProtocol import org.thoughtcrime.securesms.database.ConfigDatabase import org.thoughtcrime.securesms.database.ThreadDatabase import javax.inject.Named @@ -26,15 +28,6 @@ object SessionUtilModule { private const val POLLER_SCOPE = "poller_coroutine_scope" - @Provides - @Singleton - fun provideConfigFactory( - @ApplicationContext context: Context, - configDatabase: ConfigDatabase, - storageProtocol: StorageProtocol, - threadDatabase: ThreadDatabase, - ): ConfigFactory = ConfigFactory(context, configDatabase, threadDatabase, storageProtocol) - @Provides @Named(POLLER_SCOPE) fun providePollerScope(): CoroutineScope = GlobalScope @@ -49,12 +42,14 @@ object SessionUtilModule { fun providePollerFactory(@Named(POLLER_SCOPE) coroutineScope: CoroutineScope, @Named(POLLER_SCOPE) dispatcher: CoroutineDispatcher, configFactory: ConfigFactory, - storage: StorageProtocol, - groupManagerV2: Lazy) = PollerFactory( + storage: Lazy, + groupManagerV2: Lazy, + lokiApiDatabase: LokiAPIDatabaseProtocol) = PollerFactory( scope = coroutineScope, executor = dispatcher, configFactory = configFactory, groupManagerV2 = groupManagerV2, - storage = storage + storage = storage, + lokiApiDatabase = lokiApiDatabase, ) } \ No newline at end of file diff --git a/app/src/main/java/org/thoughtcrime/securesms/groups/CreateGroupFragment.kt b/app/src/main/java/org/thoughtcrime/securesms/groups/CreateGroupFragment.kt index 348c9caa57..e3b656dc5e 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/groups/CreateGroupFragment.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/groups/CreateGroupFragment.kt @@ -7,7 +7,6 @@ import android.view.View import android.view.ViewGroup import androidx.compose.ui.platform.ComposeView import androidx.fragment.app.Fragment -import dagger.hilt.android.AndroidEntryPoint import org.thoughtcrime.securesms.conversation.start.NullStartConversationDelegate import org.thoughtcrime.securesms.conversation.start.StartConversationDelegate import org.thoughtcrime.securesms.conversation.v2.ConversationActivityV2 diff --git a/app/src/main/java/org/thoughtcrime/securesms/groups/EditGroupActivity.kt b/app/src/main/java/org/thoughtcrime/securesms/groups/EditGroupActivity.kt index 7a86f2b553..7bb7e9c073 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/groups/EditGroupActivity.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/groups/EditGroupActivity.kt @@ -5,9 +5,11 @@ import android.content.Intent import android.os.Bundle import androidx.activity.compose.setContent import dagger.hilt.android.AndroidEntryPoint +import org.session.libsession.utilities.TextSecurePreferences import org.thoughtcrime.securesms.PassphraseRequiredActionBarActivity import org.thoughtcrime.securesms.groups.compose.EditGroupScreen import org.thoughtcrime.securesms.ui.theme.SessionMaterialTheme +import javax.inject.Inject @AndroidEntryPoint class EditGroupActivity: PassphraseRequiredActionBarActivity() { diff --git a/app/src/main/java/org/thoughtcrime/securesms/groups/EditGroupViewModel.kt b/app/src/main/java/org/thoughtcrime/securesms/groups/EditGroupViewModel.kt index 482d22f004..49acc4bad5 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/groups/EditGroupViewModel.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/groups/EditGroupViewModel.kt @@ -8,6 +8,7 @@ import dagger.assisted.AssistedInject import dagger.hilt.android.lifecycle.HiltViewModel import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.SharingStarted import kotlinx.coroutines.flow.StateFlow @@ -44,7 +45,7 @@ class EditGroupViewModel @AssistedInject constructor( // Output: the source-of-truth group information. Other states are derived from this. private val groupInfo: StateFlow>?> = - configFactory.configUpdateNotifications + (configFactory.configUpdateNotifications as Flow) .onStart { emit(Unit) } .map { withContext(Dispatchers.Default) { diff --git a/app/src/main/java/org/thoughtcrime/securesms/groups/GroupManagerV2Impl.kt b/app/src/main/java/org/thoughtcrime/securesms/groups/GroupManagerV2Impl.kt index c042f40e5a..5b191351c0 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/groups/GroupManagerV2Impl.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/groups/GroupManagerV2Impl.kt @@ -3,19 +3,14 @@ package org.thoughtcrime.securesms.groups import android.content.Context import com.google.protobuf.ByteString import dagger.hilt.android.qualifiers.ApplicationContext -import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.async -import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.withContext import network.loki.messenger.libsession_util.ConfigBase.Companion.PRIORITY_VISIBLE -import network.loki.messenger.libsession_util.GroupInfoConfig -import network.loki.messenger.libsession_util.GroupKeysConfig -import network.loki.messenger.libsession_util.GroupMembersConfig -import network.loki.messenger.libsession_util.UserGroupsConfig import network.loki.messenger.libsession_util.util.Conversation import network.loki.messenger.libsession_util.util.GroupInfo import network.loki.messenger.libsession_util.util.GroupMember +import network.loki.messenger.libsession_util.util.INVITE_STATUS_FAILED import network.loki.messenger.libsession_util.util.INVITE_STATUS_SENT import network.loki.messenger.libsession_util.util.Sodium import network.loki.messenger.libsession_util.util.UserPic @@ -24,7 +19,6 @@ import org.session.libsession.database.userAuth import org.session.libsession.messaging.MessagingModuleConfiguration import org.session.libsession.messaging.contacts.Contact import org.session.libsession.messaging.groups.GroupManagerV2 -import org.session.libsession.messaging.jobs.ConfigurationSyncJob.Companion.messageInformation import org.session.libsession.messaging.jobs.InviteContactsJob import org.session.libsession.messaging.jobs.JobQueue import org.session.libsession.messaging.messages.Destination @@ -39,12 +33,11 @@ import org.session.libsession.snode.OwnedSwarmAuth import org.session.libsession.snode.SnodeAPI import org.session.libsession.snode.SnodeMessage import org.session.libsession.snode.model.BatchResponse -import org.session.libsession.snode.model.StoreMessageResponse import org.session.libsession.snode.utilities.await import org.session.libsession.utilities.Address import org.session.libsession.utilities.SSKEnvironment +import org.session.libsession.utilities.getClosedGroup import org.session.libsession.utilities.recipients.Recipient -import org.session.libsession.utilities.withGroupConfigsOrNull import org.session.libsignal.messages.SignalServiceGroup import org.session.libsignal.protos.SignalServiceProtos.DataMessage import org.session.libsignal.protos.SignalServiceProtos.DataMessage.GroupUpdateDeleteMemberContentMessage @@ -60,7 +53,6 @@ import org.thoughtcrime.securesms.database.LokiMessageDatabase import org.thoughtcrime.securesms.database.MmsSmsDatabase import org.thoughtcrime.securesms.dependencies.ConfigFactory import org.thoughtcrime.securesms.dependencies.PollerFactory -import org.thoughtcrime.securesms.util.ConfigurationMessageUtilities import javax.inject.Inject import javax.inject.Singleton @@ -97,8 +89,6 @@ class GroupManagerV2Impl @Inject constructor( ): Recipient = withContext(dispatcher) { val ourAccountId = requireNotNull(storage.getUserPublicKey()) { "Our account ID is not available" } - val ourKeys = - requireNotNull(storage.getUserED25519KeyPair()) { "Our ED25519 key pair is not available" } val ourProfile = storage.getUserProfile() val groupCreationTimestamp = SnodeAPI.nowWithOffset @@ -108,9 +98,8 @@ class GroupManagerV2Impl @Inject constructor( configs.userGroups.createGroup().also(configs.userGroups::set) } - val adminKey = checkNotNull(group.adminKey) { "Admin key is null for new group creation." } + checkNotNull(group.adminKey) { "Admin key is null for new group creation." } val groupId = group.groupAccountId - val groupAuth = OwnedSwarmAuth.ofClosedGroup(groupId, adminKey) try { configFactory.withMutableGroupConfigs(groupId) { configs -> @@ -141,7 +130,7 @@ class GroupManagerV2Impl @Inject constructor( ) // Manually re-key to prevent issue with linked admin devices - configs.rekeys() + configs.rekey() } configFactory.withMutableUserConfigs { @@ -238,10 +227,10 @@ class GroupManagerV2Impl @Inject constructor( ) ) } else { - configs.rekeys() + configs.rekey() } - newMembers.map { configs.groupKeys.makeSubAccount(group) } + newMembers.map { configs.groupKeys.getSubAccountToken(it) } } @@ -301,9 +290,10 @@ class GroupManagerV2Impl @Inject constructor( } override suspend fun handleMemberLeft(message: GroupUpdated, closedGroupId: AccountId) { - val userGroups = configFactory.userGroups ?: return val closedGroupHexString = closedGroupId.hexString - val closedGroup = userGroups.getClosedGroup(closedGroupId.hexString) ?: return + val closedGroup = + configFactory.withUserConfigs { it.userGroups.getClosedGroup(closedGroupId.hexString) } + ?: return if (closedGroup.hasAdminKey()) { // re-key and do a new config removing the previous member doRemoveMembers( @@ -313,22 +303,27 @@ class GroupManagerV2Impl @Inject constructor( removeMemberMessages = false ) } else { - configFactory.getGroupMemberConfig(closedGroupId)?.use { memberConfig -> - // if the leaving member is an admin, disable the group and remove it - // This is just to emulate the "existing" group behaviour, this will need to be removed in future - if (memberConfig.get(message.sender!!)?.admin == true) { - pollerFactory.pollerFor(closedGroupId)?.stop() - storage.getThreadId(Address.fromSerialized(closedGroupHexString)) - ?.let(storage::deleteConversation) - configFactory.removeGroup(closedGroupId) - } + val hasAnyAdminRemaining = configFactory.withGroupConfigs(closedGroupId) { configs -> + configs.groupMembers.all() + .asSequence() + .filterNot { it.sessionId == message.sender } + .any { it.admin && !it.removed } + } + + // if the leaving member is an admin, disable the group and remove it + // This is just to emulate the "existing" group behaviour, this will need to be removed in future + if (!hasAnyAdminRemaining) { + pollerFactory.pollerFor(closedGroupId)?.stop() + storage.getThreadId(Address.fromSerialized(closedGroupHexString)) + ?.let(storage::deleteConversation) + configFactory.removeGroup(closedGroupId) } } } override suspend fun leaveGroup(group: AccountId, deleteOnLeave: Boolean) { val canSendGroupMessage = - configFactory.userGroups?.getClosedGroup(group.hexString)?.kicked != true + configFactory.withUserConfigs { it.userGroups.getClosedGroup(group.hexString) }?.kicked != true val address = Address.fromSerialized(group.hexString) if (canSendGroupMessage) { @@ -358,79 +353,81 @@ class GroupManagerV2Impl @Inject constructor( if (deleteOnLeave) { storage.getThreadId(address)?.let(storage::deleteConversation) configFactory.removeGroup(group) - ConfigurationMessageUtilities.forceSyncConfigurationNowIfNeeded(application) } } - override suspend fun promoteMember(group: AccountId, members: List): Unit = - withContext(dispatcher) { - val adminKey = requireAdminAccess(group) + override suspend fun promoteMember( + group: AccountId, + members: List + ): Unit = withContext(dispatcher) { + val adminKey = requireAdminAccess(group) + val groupName = configFactory.withGroupConfigs(group) { it.groupInfo.getName() } - configFactory.withGroupConfigsOrNull(group) { info, membersConfig, keys -> - // Promote the members by sending a message containing the admin key to each member's swarm, - // we do this concurrently and then update the group configs after all the messages are sent. - val promoteResult = members.asSequence() - .mapNotNull { membersConfig.get(it.hexString) } - .map { memberConfig -> - async { - val message = GroupUpdated( - GroupUpdateMessage.newBuilder() - .setPromoteMessage( - DataMessage.GroupUpdatePromoteMessage.newBuilder() - .setGroupIdentitySeed(ByteString.copyFrom(adminKey)) - .setName(info.getName()) - ) - .build() - ) + // Send out the promote message to the members concurrently + val promotionDeferred = members.associateWith { member -> + async { + val message = GroupUpdated( + GroupUpdateMessage.newBuilder() + .setPromoteMessage( + DataMessage.GroupUpdatePromoteMessage.newBuilder() + .setGroupIdentitySeed(ByteString.copyFrom(adminKey)) + .setName(groupName) + ) + .build() + ) - try { - MessageSender.sendNonDurably( - message = message, - address = Address.fromSerialized(memberConfig.sessionId), - isSyncMessage = false - ).await() - - memberConfig.setPromoteSent() - } catch (ec: Exception) { - Log.e(TAG, "Failed to send promote message", ec) - memberConfig.setPromoteFailed() - } - } - } - .toList() - - for (result in promoteResult) { - membersConfig.set(result.await()) - } - - configFactory.saveGroupConfigs(keys, info, membersConfig) + MessageSender.sendNonDurably( + message = message, + address = Address.fromSerialized(member.hexString), + isSyncMessage = false + ).await() } - - // Send a group update message to the group telling members someone has been promoted - val groupDestination = Destination.ClosedGroup(group.hexString) - ConfigurationMessageUtilities.forceSyncConfigurationNowIfNeeded(groupDestination) - val timestamp = SnodeAPI.nowWithOffset - val signature = SodiumUtilities.sign( - buildMemberChangeSignature(GroupUpdateMemberChangeMessage.Type.PROMOTED, timestamp), - adminKey - ) - val message = GroupUpdated( - GroupUpdateMessage.newBuilder() - .setMemberChangeMessage( - GroupUpdateMemberChangeMessage.newBuilder() - .addAllMemberSessionIds(members.map { it.hexString }) - .setType(GroupUpdateMemberChangeMessage.Type.PROMOTED) - .setAdminSignature(ByteString.copyFrom(signature)) - ) - .build() - ).apply { - sentTimestamp = timestamp - } - - MessageSender.send(message, Address.fromSerialized(group.hexString)) - storage.insertGroupInfoChange(message, group) } + // Wait and gather all the promote message sending result into a result map + val promotedByMemberIDs = promotionDeferred + .mapValues { + runCatching { it.value.await() }.isSuccess + } + + // Update each member's status + configFactory.withMutableGroupConfigs(group) { configs -> + promotedByMemberIDs.asSequence() + .mapNotNull { (member, success) -> + configs.groupMembers.get(member.hexString)?.copy( + promotionStatus = if (success) { + INVITE_STATUS_SENT + } else { + INVITE_STATUS_FAILED + } + ) + } + .forEach(configs.groupMembers::set) + } + + // Send a group update message to the group telling members someone has been promoted + val timestamp = SnodeAPI.nowWithOffset + val signature = SodiumUtilities.sign( + buildMemberChangeSignature(GroupUpdateMemberChangeMessage.Type.PROMOTED, timestamp), + adminKey + ) + val message = GroupUpdated( + GroupUpdateMessage.newBuilder() + .setMemberChangeMessage( + GroupUpdateMemberChangeMessage.newBuilder() + .addAllMemberSessionIds(members.map { it.hexString }) + .setType(GroupUpdateMemberChangeMessage.Type.PROMOTED) + .setAdminSignature(ByteString.copyFrom(signature)) + ) + .build() + ).apply { + sentTimestamp = timestamp + } + + MessageSender.send(message, Address.fromSerialized(group.hexString)) + storage.insertGroupInfoChange(message, group) + } + private suspend fun doRemoveMembers( group: AccountId, removedMembers: List, @@ -440,26 +437,27 @@ class GroupManagerV2Impl @Inject constructor( val adminKey = requireAdminAccess(group) val groupAuth = OwnedSwarmAuth.ofClosedGroup(group, adminKey) - configFactory.withGroupConfigsOrNull(group) { info, members, keys -> - // To remove a member from a group, we need to first: - // 1. Notify the swarm that this member's key has bene revoked - // 2. Send a "kicked" message to a special namespace that the kicked member can still read - // 3. Optionally, send "delete member messages" to the group. (So that every device in the group - // delete this member's messages locally.) - // These three steps will be included in a sequential call as they all need to be done in order. - // After these steps are all done, we will do the following: - // Update the group configs to remove the member, sync if needed, then - // delete the member's messages locally and remotely. + // To remove a member from a group, we need to first: + // 1. Notify the swarm that this member's key has bene revoked + // 2. Send a "kicked" message to a special namespace that the kicked member can still read + // 3. Optionally, send "delete member messages" to the group. (So that every device in the group + // delete this member's messages locally.) + // These three steps will be included in a sequential call as they all need to be done in order. + // After these steps are all done, we will do the following: + // Update the group configs to remove the member, sync if needed, then + // delete the member's messages locally and remotely. + + val essentialRequests = configFactory.withGroupConfigs(group) { configs -> val messageSendTimestamp = SnodeAPI.nowWithOffset - val essentialRequests = buildList { + buildList { this += SnodeAPI.buildAuthenticatedRevokeSubKeyBatchRequest( groupAdminAuth = groupAuth, - subAccountTokens = removedMembers.map(keys::getSubAccountToken) + subAccountTokens = removedMembers.map(configs.groupKeys::getSubAccountToken) ) this += Sodium.encryptForMultipleSimple( - messages = removedMembers.map { "${it.hexString}-${keys.currentGeneration()}".encodeToByteArray() } + messages = removedMembers.map { "${it.hexString}-${configs.groupKeys.currentGeneration()}".encodeToByteArray() } .toTypedArray(), recipients = removedMembers.map { it.pubKeyBytes }.toTypedArray(), ed25519SecretKey = adminKey, @@ -506,132 +504,97 @@ class GroupManagerV2Impl @Inject constructor( ) } } + } - val snode = SnodeAPI.getSingleTargetSnode(group.hexString).await() - val responses = SnodeAPI.getBatchResponse( - snode, - group.hexString, - essentialRequests, - sequence = true - ) + val snode = SnodeAPI.getSingleTargetSnode(group.hexString).await() + val responses = SnodeAPI.getBatchResponse( + snode, + group.hexString, + essentialRequests, + sequence = true + ) - responses.requireAllRequestsSuccessful("Failed to execute essential steps for removing member") + responses.requireAllRequestsSuccessful("Failed to execute essential steps for removing member") - // Next step: update group configs, rekey, remove member messages if required - val messagesToDelete = mutableListOf() - for (member in removedMembers) { - members.erase(member.hexString) - } + // Next step: update group configs, rekey, remove member messages if required + configFactory.withMutableGroupConfigs(group) { configs -> + removedMembers.forEach { configs.groupMembers.erase(it.hexString) } + configs.rekey() + } - keys.rekey(info, members) - - if (removeMemberMessages) { - val threadId = storage.getThreadId(Address.fromSerialized(group.hexString)) - if (threadId != null) { - for (member in removedMembers) { - for (msg in mmsSmsDatabase.getUserMessages(threadId, member.hexString)) { - val serverHash = lokiDatabase.getMessageServerHash(msg.id, msg.isMms) - if (serverHash != null) { - messagesToDelete.add(serverHash) - } + if (removeMemberMessages) { + val threadId = storage.getThreadId(Address.fromSerialized(group.hexString)) + if (threadId != null) { + val messagesToDelete = mutableListOf() + for (member in removedMembers) { + for (msg in mmsSmsDatabase.getUserMessages(threadId, member.hexString)) { + val serverHash = lokiDatabase.getMessageServerHash(msg.id, msg.isMms) + if (serverHash != null) { + messagesToDelete.add(serverHash) } - - storage.deleteMessagesByUser(threadId, member.hexString) } - } - } - val requests = buildList { - keys.messageInformation(groupAuth)?.let { - this += "Sync keys config messages" to it.batch + storage.deleteMessagesByUser(threadId, member.hexString) } - this += "Sync info config messages" to info.messageInformation( - messagesToDelete, - groupAuth - ).batch - - this += "Sync member config messages" to members.messageInformation( - messagesToDelete, - groupAuth - ).batch - - this += "Delete outdated config and member messages" to SnodeAPI.buildAuthenticatedDeleteBatchInfo( - groupAuth, - messagesToDelete - ) - } - - val response = SnodeAPI.getBatchResponse( - snode = snode, - publicKey = group.hexString, - requests = requests.map { it.second } - ) - - response.requireAllRequestsSuccessful("Failed to remove members") - - // Persist the changes - configFactory.saveGroupConfigs(keys, info, members) - - if (sendRemovedMessage) { - val timestamp = messageSendTimestamp - val signature = SodiumUtilities.sign( - buildMemberChangeSignature( - GroupUpdateMemberChangeMessage.Type.REMOVED, - timestamp - ), - adminKey - ) - - val updateMessage = GroupUpdateMessage.newBuilder() - .setMemberChangeMessage( - GroupUpdateMemberChangeMessage.newBuilder() - .addAllMemberSessionIds(removedMembers.map { it.hexString }) - .setType(GroupUpdateMemberChangeMessage.Type.REMOVED) - .setAdminSignature(ByteString.copyFrom(signature)) + SnodeAPI.sendBatchRequest( + snode, group.hexString, SnodeAPI.buildAuthenticatedDeleteBatchInfo( + groupAuth, + messagesToDelete ) - .build() - val message = GroupUpdated( - updateMessage - ).apply { sentTimestamp = timestamp } - MessageSender.send(message, Destination.ClosedGroup(group.hexString), false) - storage.insertGroupInfoChange(message, group) + ) } } - ConfigurationMessageUtilities.forceSyncConfigurationNowIfNeeded( - Destination.ClosedGroup(group.hexString) - ) + if (sendRemovedMessage) { + val timestamp = SnodeAPI.nowWithOffset + val signature = SodiumUtilities.sign( + buildMemberChangeSignature( + GroupUpdateMemberChangeMessage.Type.REMOVED, + timestamp + ), + adminKey + ) + + val updateMessage = GroupUpdateMessage.newBuilder() + .setMemberChangeMessage( + GroupUpdateMemberChangeMessage.newBuilder() + .addAllMemberSessionIds(removedMembers.map { it.hexString }) + .setType(GroupUpdateMemberChangeMessage.Type.REMOVED) + .setAdminSignature(ByteString.copyFrom(signature)) + ) + .build() + val message = GroupUpdated( + updateMessage + ).apply { sentTimestamp = timestamp } + MessageSender.send(message, Destination.ClosedGroup(group.hexString), false) + storage.insertGroupInfoChange(message, group) + } } override suspend fun respondToInvitation(groupId: AccountId, approved: Boolean) = withContext(dispatcher) { - val groups = requireNotNull(configFactory.userGroups) { - "User groups config is not available" - } + val group = requireNotNull( + configFactory.withUserConfigs { it.userGroups.getClosedGroup(groupId.hexString) } + ) { "User groups config is not available" } val threadId = checkNotNull(storage.getThreadId(Address.fromSerialized(groupId.hexString))) { "No thread has been created for the group" } - val group = requireNotNull(groups.getClosedGroup(groupId.hexString)) { - "Group must have been created into the config object before responding to an invitation" - } - // Whether approved or not, delete the invite lokiDatabase.deleteGroupInviteReferrer(threadId) if (approved) { - approveGroupInvite(groups, group, threadId) + approveGroupInvite(group, threadId) } else { - groups.eraseClosedGroup(groupId.hexString) + configFactory.withMutableUserConfigs { it.userGroups.eraseClosedGroup(groupId.hexString) } storage.deleteConversation(threadId) } } private fun approveGroupInvite( - groups: UserGroupsConfig, group: GroupInfo.ClosedGroupInfo, threadId: Long, ) { @@ -640,9 +603,9 @@ class GroupManagerV2Impl @Inject constructor( } // Clear the invited flag of the group in the config - groups.set(group.copy(invited = false)) - configFactory.persist(forConfigObject = groups, timestamp = SnodeAPI.nowWithOffset) - ConfigurationMessageUtilities.forceSyncConfigurationNowIfNeeded(application) + configFactory.withMutableUserConfigs { configs -> + configs.userGroups.set(group.copy(invited = false)) + } if (group.adminKey == null) { // Send an invite response to the group if we are invited as a regular member @@ -659,19 +622,13 @@ class GroupManagerV2Impl @Inject constructor( ) } else { // If we are invited as admin, we can just update the group info ourselves - configFactory.withGroupConfigsOrNull(group.groupAccountId) { info, members, keys -> - members.get(key)?.let { member -> - members.set(member.setPromoteSuccess().setAccepted()) - - configFactory.saveGroupConfigs(keys, info, members) + configFactory.withMutableGroupConfigs(group.groupAccountId) { configs -> + configs.groupMembers.get(key)?.let { member -> + configs.groupMembers.set(member.setPromoteSuccess().setAccepted()) } Unit } - - ConfigurationMessageUtilities.forceSyncConfigurationNowIfNeeded( - destination = Destination.ClosedGroup(group.groupAccountId.hexString) - ) } pollerFactory.pollerFor(group.groupAccountId)?.start() @@ -696,8 +653,12 @@ class GroupManagerV2Impl @Inject constructor( if (inviteMessageHash != null) { val auth = requireNotNull(storage.userAuth) { "No current user available" } SnodeAPI.sendBatchRequest( - auth.accountId, - SnodeAPI.buildAuthenticatedDeleteBatchInfo(auth, listOf(inviteMessageHash)), + snode = SnodeAPI.getSingleTargetSnode(groupId.hexString).await(), + publicKey = auth.accountId.hexString, + request = SnodeAPI.buildAuthenticatedDeleteBatchInfo( + auth, + listOf(inviteMessageHash) + ), ) } } @@ -709,12 +670,9 @@ class GroupManagerV2Impl @Inject constructor( promoter: AccountId, promoteMessageHash: String? ) = withContext(dispatcher) { - val groups = requireNotNull(configFactory.userGroups) { - "User groups config is not available" - } - val userAuth = requireNotNull(storage.userAuth) { "No current user available" } - var group = groups.getClosedGroup(groupId.hexString) + val group = + configFactory.withUserConfigs { it.userGroups.getClosedGroup(groupId.hexString) } if (group == null) { // If we haven't got the group in the config, it could mean that we haven't @@ -729,34 +687,27 @@ class GroupManagerV2Impl @Inject constructor( ) } else { // If we have the group in the config, we can just update the admin key - group = group.copy(adminKey = adminKey) - groups.set(group) - configFactory.persist(groups, SnodeAPI.nowWithOffset) + configFactory.withMutableUserConfigs { + it.userGroups.set(group.copy(adminKey = adminKey)) + } // Update our promote state - configFactory.withGroupConfigsOrNull(groupId) { info, members, keys -> - members.get(userAuth.accountId.hexString)?.let { member -> - members.set(member.setPromoteSuccess()) - - configFactory.saveGroupConfigs(keys, info, members) + configFactory.withMutableGroupConfigs(groupId) { configs -> + configs.groupMembers.get(userAuth.accountId.hexString)?.let { member -> + configs.groupMembers.set(member.setPromoteSuccess()) } Unit } - - ConfigurationMessageUtilities.forceSyncConfigurationNowIfNeeded( - destination = Destination.ClosedGroup(groupId.hexString) - ) - - ConfigurationMessageUtilities.forceSyncConfigurationNowIfNeeded(application) } // Delete the promotion message remotely if (promoteMessageHash != null) { - SnodeAPI.sendBatchRequest( - userAuth.accountId, - SnodeAPI.buildAuthenticatedDeleteBatchInfo(userAuth, listOf(promoteMessageHash)), - ) + SnodeAPI.deleteMessage( + userAuth.accountId.hexString, + userAuth, + listOf(promoteMessageHash) + ).await() } } @@ -777,12 +728,8 @@ class GroupManagerV2Impl @Inject constructor( fromPromotion: Boolean, inviter: AccountId, ) { - val groups = requireNotNull(configFactory.userGroups) { - "User groups config is not available" - } - // If we have already received an invitation in the past, we should not process this one - if (groups.getClosedGroup(groupId.hexString)?.invited == true) { + if (configFactory.withUserConfigs { it.userGroups.getClosedGroup(groupId.hexString) }?.invited == true) { return } @@ -799,15 +746,17 @@ class GroupManagerV2Impl @Inject constructor( invited = !shouldAutoApprove, name = groupName, ) - groups.set(closedGroupInfo) - configFactory.persist(groups, SnodeAPI.nowWithOffset) + configFactory.withMutableUserConfigs { + it.userGroups.set(closedGroupInfo) + } + profileManager.setName(application, recipient, groupName) val groupThreadId = storage.getOrCreateThreadIdFor(recipient.address) storage.setRecipientApprovedMe(recipient, true) storage.setRecipientApproved(recipient, shouldAutoApprove) if (shouldAutoApprove) { - approveGroupInvite(groups, closedGroupInfo, groupThreadId) + approveGroupInvite(closedGroupInfo, groupThreadId) } else { lokiDatabase.addGroupInviteReferrer(groupThreadId, inviter.hexString) storage.insertGroupInviteControlMessage( @@ -829,28 +778,18 @@ class GroupManagerV2Impl @Inject constructor( return@withContext } - val groups = requireNotNull(configFactory.userGroups) { - "User groups config is not available" - } - - val adminKey = groups.getClosedGroup(groupId.hexString)?.adminKey + val adminKey = configFactory.getClosedGroup(groupId)?.adminKey if (adminKey == null || adminKey.isEmpty()) { return@withContext // We don't have the admin key, we can't process the invite response } - configFactory.withGroupConfigsOrNull(groupId) { info, members, keys -> - val member = members.get(sender.hexString) - if (member == null) { + configFactory.withMutableGroupConfigs(groupId) { configs -> + val member = configs.groupMembers.get(sender.hexString) + if (member != null) { + configs.groupMembers.set(member.setAccepted()) + } else { Log.e(TAG, "User wasn't in the group membership to add!") - return@withContext } - - members.set(member.setAccepted()) - - configFactory.saveGroupConfigs(keys, info, members) - ConfigurationMessageUtilities.forceSyncConfigurationNowIfNeeded( - Destination.ClosedGroup(groupId.hexString) - ) } } @@ -861,26 +800,24 @@ class GroupManagerV2Impl @Inject constructor( pollerFactory.pollerFor(groupId)?.stop() val userId = requireNotNull(storage.getUserPublicKey()) { "No current user available" } - val userGroups = - requireNotNull(configFactory.userGroups) { "User groups config is not available" } - val group = userGroups.getClosedGroup(groupId.hexString) ?: return@withContext + val group = configFactory.getClosedGroup(groupId) ?: return@withContext // Retrieve the group name one last time from the group info, // as we are going to clear the keys, we won't have the chance to // read the group name anymore. - val groupName = configFactory.getGroupInfoConfig(groupId) - ?.use { it.getName() } - ?: group.name + val groupName = configFactory.withGroupConfigs(groupId) { configs -> + configs.groupInfo.getName() + } - userGroups.set( - group.copy( - authData = null, - adminKey = null, - name = groupName + configFactory.withMutableUserConfigs { + it.userGroups.set( + group.copy( + authData = null, + adminKey = null, + name = groupName + ) ) - ) - - configFactory.persist(userGroups, SnodeAPI.nowWithOffset) + } storage.insertIncomingInfoMessage( context = MessagingModuleConfiguration.shared.context, @@ -898,17 +835,10 @@ class GroupManagerV2Impl @Inject constructor( withContext(dispatcher) { val adminKey = requireAdminAccess(groupId) - configFactory.getGroupInfoConfig(groupId)?.use { infoConfig -> - infoConfig.setName(newName) - configFactory.persist( - infoConfig, - SnodeAPI.nowWithOffset, - forPublicKey = groupId.hexString - ) + configFactory.withMutableGroupConfigs(groupId) { + it.groupInfo.setName(newName) } - val groupDestination = Destination.ClosedGroup(groupId.hexString) - ConfigurationMessageUtilities.forceSyncConfigurationNowIfNeeded(groupDestination) val timestamp = SnodeAPI.nowWithOffset val signature = SodiumUtilities.sign( buildInfoChangeVerifier(GroupUpdateInfoChangeMessage.Type.NAME, timestamp), @@ -944,9 +874,7 @@ class GroupManagerV2Impl @Inject constructor( // meanwhile, if we are admin we can just delete those messages from the group swarm, and otherwise // the admins can pick up the group message and delete the messages on our behalf. - val userGroups = - requireNotNull(configFactory.userGroups) { "User groups config is not available" } - val group = requireNotNull(userGroups.getClosedGroup(groupId.hexString)) { + val group = requireNotNull(configFactory.getClosedGroup(groupId)) { "Group doesn't exist" } val userPubKey = requireNotNull(storage.getUserPublicKey()) { "No current user available" } @@ -965,11 +893,8 @@ class GroupManagerV2Impl @Inject constructor( // If we are admin, we can delete the messages from the group swarm group.adminKey?.let { adminKey -> - deleteMessageFromGroupSwarm( - groupId, - OwnedSwarmAuth.ofClosedGroup(groupId, adminKey), - messageHashes - ) + SnodeAPI.deleteMessage(groupId.hexString, OwnedSwarmAuth.ofClosedGroup(groupId, adminKey), messageHashes) + .await() } // Construct a message to ask members to delete the messages, sign if we are admin, then send @@ -1043,7 +968,7 @@ class GroupManagerV2Impl @Inject constructor( } } - val adminKey = configFactory.userGroups?.getClosedGroup(groupId.hexString)?.adminKey + val adminKey = configFactory.getClosedGroup(groupId)?.adminKey if (!senderIsVerifiedAdmin && adminKey != null) { // If the deletion request comes from a non-admin, and we as an admin, will also delete // the content from the swarm, provided that the messages are actually sent by that user @@ -1053,11 +978,8 @@ class GroupManagerV2Impl @Inject constructor( groupId.hexString ) ) { - deleteMessageFromGroupSwarm( - groupId, - OwnedSwarmAuth.ofClosedGroup(groupId, adminKey), - hashes - ) + SnodeAPI.deleteMessage(groupId.hexString, OwnedSwarmAuth.ofClosedGroup(groupId, adminKey), hashes) + .await() } // The non-admin user shouldn't be able to delete other user's messages so we will @@ -1065,16 +987,6 @@ class GroupManagerV2Impl @Inject constructor( } } - private suspend fun deleteMessageFromGroupSwarm( - groupId: AccountId, - auth: OwnedSwarmAuth, - hashes: List - ) { - SnodeAPI.sendBatchRequest( - groupId, SnodeAPI.buildAuthenticatedDeleteBatchInfo(auth, hashes) - ) - } - private fun BatchResponse.requireAllRequestsSuccessful(errorMessage: String) { val firstError = this.results.firstOrNull { it.code != 200 } require(firstError == null) { "$errorMessage: ${firstError!!.body}" } diff --git a/app/src/main/java/org/thoughtcrime/securesms/notifications/PushRegistrationHandler.kt b/app/src/main/java/org/thoughtcrime/securesms/notifications/PushRegistrationHandler.kt index a8382477de..f0de8e929d 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/notifications/PushRegistrationHandler.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/notifications/PushRegistrationHandler.kt @@ -9,15 +9,13 @@ import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.Job import kotlinx.coroutines.async import kotlinx.coroutines.awaitAll +import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.combine import kotlinx.coroutines.flow.debounce import kotlinx.coroutines.flow.filterNotNull import kotlinx.coroutines.flow.onStart import kotlinx.coroutines.flow.scan import kotlinx.coroutines.launch -import network.loki.messenger.libsession_util.GroupInfoConfig -import network.loki.messenger.libsession_util.GroupKeysConfig -import network.loki.messenger.libsession_util.GroupMembersConfig import org.session.libsession.database.userAuth import org.session.libsession.messaging.notifications.TokenFetcher import org.session.libsession.snode.OwnedSwarmAuth @@ -59,7 +57,7 @@ constructor( job = scope.launch(Dispatchers.Default) { combine( - configFactory.configUpdateNotifications + (configFactory.configUpdateNotifications as Flow) .debounce(500L) .onStart { emit(Unit) }, IdentityKeyUtil.CHANGES.onStart { emit(Unit) }, @@ -73,13 +71,9 @@ constructor( val userAuth = storage.userAuth ?: return@combine emptyMap() getGroupSubscriptions( - token = token, - userSecretKey = userAuth.ed25519PrivateKey + token = token ) + mapOf( - SubscriptionKey(userAuth.accountId, token) to OwnedSubscription( - userAuth, - 0 - ) + SubscriptionKey(userAuth.accountId, token) to Subscription(userAuth, 0) ) } .scan, Pair, Map>?>( @@ -106,13 +100,11 @@ constructor( val subscription = current.getValue(key) async { try { - subscription.withAuth { auth -> - pushRegistry.register( - token = key.token, - swarmAuth = auth, - namespaces = listOf(subscription.namespace) - ) - } + pushRegistry.register( + token = key.token, + swarmAuth = subscription.auth, + namespaces = listOf(subscription.namespace) + ) } catch (e: Exception) { Log.e(TAG, "Failed to register for push notification", e) } @@ -123,12 +115,10 @@ constructor( val subscription = prev.getValue(key) async { try { - subscription.withAuth { auth -> - pushRegistry.unregister( - token = key.token, - swarmAuth = auth, - ) - } + pushRegistry.unregister( + token = key.token, + swarmAuth = subscription.auth, + ) } catch (e: Exception) { Log.e(TAG, "Failed to unregister for push notification", e) } @@ -141,17 +131,16 @@ constructor( } private fun getGroupSubscriptions( - token: String, - userSecretKey: ByteArray + token: String ): Map { return buildMap { - val groups = configFactory.userGroups?.allClosedGroupInfo().orEmpty() + val groups = configFactory.withUserConfigs { it.userGroups.allClosedGroupInfo() } for (group in groups) { val adminKey = group.adminKey if (adminKey != null && adminKey.isNotEmpty()) { put( SubscriptionKey(group.groupAccountId, token), - OwnedSubscription( + Subscription( auth = OwnedSwarmAuth.ofClosedGroup(group.groupAccountId, adminKey), namespace = Namespace.GROUPS() ) @@ -161,15 +150,11 @@ constructor( val authData = group.authData if (authData != null && authData.isNotEmpty()) { - val subscription = - configFactory.withGroupConfigsOrNull(group.groupAccountId) { info, members, keys -> - SubAccountSubscription( - authData = authData, - groupInfoConfigDump = info.dump(), - groupMembersConfigDump = members.dump(), - groupKeysConfigDump = keys.dump(), - groupId = group.groupAccountId, - userSecretKey = userSecretKey + val subscription = configFactory.getGroupAuth(group.groupAccountId) + ?.let { + Subscription( + auth = it, + namespace = Namespace.GROUPS() ) } @@ -181,53 +166,6 @@ constructor( } } - private data class SubscriptionKey( - val accountId: AccountId, - val token: String, - ) - - private sealed interface Subscription { - suspend fun withAuth(cb: suspend (SwarmAuth) -> Unit) - val namespace: Int - } - - private class OwnedSubscription(val auth: OwnedSwarmAuth, override val namespace: Int) : - Subscription { - override suspend fun withAuth(cb: suspend (SwarmAuth) -> Unit) { - cb(auth) - } - } - - private class SubAccountSubscription( - val groupId: AccountId, - val userSecretKey: ByteArray, - val authData: ByteArray, - val groupInfoConfigDump: ByteArray, - val groupMembersConfigDump: ByteArray, - val groupKeysConfigDump: ByteArray - ) : Subscription { - override suspend fun withAuth(cb: suspend (SwarmAuth) -> Unit) { - GroupInfoConfig.newInstance(groupId.pubKeyBytes, initialDump = groupInfoConfigDump) - .use { info -> - GroupMembersConfig.newInstance( - groupId.pubKeyBytes, - initialDump = groupMembersConfigDump - ).use { members -> - GroupKeysConfig.newInstance( - userSecretKey = userSecretKey, - groupPublicKey = groupId.pubKeyBytes, - initialDump = groupKeysConfigDump, - info = info, - members = members - ).use { keys -> - cb(GroupSubAccountSwarmAuth(keys, groupId, authData)) - } - } - } - } - - override val namespace: Int - get() = Namespace.GROUPS() - } - + private data class SubscriptionKey(val accountId: AccountId, val token: String) + private data class Subscription(val auth: SwarmAuth, val namespace: Int) } \ No newline at end of file diff --git a/app/src/main/java/org/thoughtcrime/securesms/onboarding/loading/LoadingActivity.kt b/app/src/main/java/org/thoughtcrime/securesms/onboarding/loading/LoadingActivity.kt index abf0471598..ecb923ca10 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/onboarding/loading/LoadingActivity.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/onboarding/loading/LoadingActivity.kt @@ -28,8 +28,6 @@ class LoadingActivity: BaseActionBarActivity() { private val viewModel: LoadingViewModel by viewModels() private fun register(loadFailed: Boolean) { - prefs.setLastConfigurationSyncTime(System.currentTimeMillis()) - when { loadFailed -> startPickDisplayNameActivity(loadFailed = true) else -> startHomeActivity(isNewAccount = false, isFromOnboarding = true) diff --git a/app/src/main/java/org/thoughtcrime/securesms/onboarding/loading/LoadingViewModel.kt b/app/src/main/java/org/thoughtcrime/securesms/onboarding/loading/LoadingViewModel.kt index a7871d5620..5b8e8bac58 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/onboarding/loading/LoadingViewModel.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/onboarding/loading/LoadingViewModel.kt @@ -16,12 +16,16 @@ import kotlinx.coroutines.flow.asStateFlow import kotlinx.coroutines.flow.buffer import kotlinx.coroutines.flow.collectLatest import kotlinx.coroutines.flow.filter +import kotlinx.coroutines.flow.first import kotlinx.coroutines.flow.flatMapLatest import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.onStart import kotlinx.coroutines.flow.timeout import kotlinx.coroutines.launch import kotlinx.coroutines.withContext +import org.session.libsession.utilities.ConfigFactoryProtocol +import org.session.libsession.utilities.ConfigUpdateNotification import org.session.libsession.utilities.TextSecurePreferences import javax.inject.Inject import kotlin.time.Duration @@ -43,7 +47,8 @@ private val REFRESH_TIME = 50.milliseconds @OptIn(FlowPreview::class, ExperimentalCoroutinesApi::class) @HiltViewModel internal class LoadingViewModel @Inject constructor( - val prefs: TextSecurePreferences + val prefs: TextSecurePreferences, + val configFactory: ConfigFactoryProtocol, ): ViewModel() { private val state = MutableStateFlow(State.LOADING) @@ -65,14 +70,19 @@ internal class LoadingViewModel @Inject constructor( .collectLatest { _progress.value = it } } - viewModelScope.launch(Dispatchers.IO) { + viewModelScope.launch { try { - TextSecurePreferences.events - .filter { it == TextSecurePreferences.CONFIGURATION_SYNCED } - .onStart { emit(TextSecurePreferences.CONFIGURATION_SYNCED) } - .filter { prefs.getConfigurationMessageSynced() } - .timeout(TIMEOUT_TIME) - .collectLatest { onSuccess() } + configFactory.configUpdateNotifications + .filter { it == ConfigUpdateNotification.UserConfigs } + .onStart { emit(ConfigUpdateNotification.UserConfigs) } + .filter { + configFactory.withUserConfigs { configs -> + !configs.userProfile.getName().isNullOrEmpty() + } + } +// .timeout(TIMEOUT_TIME) + .first() + onSuccess() } catch (e: Exception) { onFail() } @@ -80,19 +90,15 @@ internal class LoadingViewModel @Inject constructor( } private suspend fun onSuccess() { - withContext(Dispatchers.Main) { - state.value = State.SUCCESS - delay(IDLE_DONE_TIME) - _events.emit(Event.SUCCESS) - } - } + state.value = State.SUCCESS + delay(IDLE_DONE_TIME) + _events.emit(Event.SUCCESS) +} private suspend fun onFail() { - withContext(Dispatchers.Main) { - state.value = State.FAIL - delay(IDLE_DONE_TIME) - _events.emit(Event.TIMEOUT) - } + state.value = State.FAIL + delay(IDLE_DONE_TIME) + _events.emit(Event.TIMEOUT) } } diff --git a/app/src/main/java/org/thoughtcrime/securesms/ui/theme/Themes.kt b/app/src/main/java/org/thoughtcrime/securesms/ui/theme/Themes.kt index 2f4957565b..9ef7c23da7 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/ui/theme/Themes.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/ui/theme/Themes.kt @@ -15,6 +15,8 @@ import androidx.compose.ui.platform.LocalContext import androidx.compose.ui.tooling.preview.PreviewParameterProvider import androidx.compose.ui.unit.dp import org.session.libsession.utilities.AppTextSecurePreferences +import org.session.libsession.utilities.TextSecurePreferences +import org.thoughtcrime.securesms.ApplicationContext // Globally accessible composition local objects val LocalColors = compositionLocalOf { ClassicDark() } @@ -32,11 +34,10 @@ fun invalidateComposeThemeColors() { */ @Composable fun SessionMaterialTheme( + preferences: TextSecurePreferences = + (LocalContext.current.applicationContext as ApplicationContext).textSecurePreferences, content: @Composable () -> Unit ) { - val context = LocalContext.current - val preferences = AppTextSecurePreferences(context) - val cachedColors = cachedColorsProvider ?: preferences.getColorsProvider().also { cachedColorsProvider = it } SessionMaterialTheme( diff --git a/libsession-util/src/main/cpp/config_base.cpp b/libsession-util/src/main/cpp/config_base.cpp index 5af6483371..8017d2bc7d 100644 --- a/libsession-util/src/main/cpp/config_base.cpp +++ b/libsession-util/src/main/cpp/config_base.cpp @@ -102,20 +102,6 @@ Java_network_loki_messenger_libsession_1util_ConfigBase_merge___3Lkotlin_Pair_2( }); } -JNIEXPORT jobject JNICALL -Java_network_loki_messenger_libsession_1util_ConfigBase_merge__Lkotlin_Pair_2(JNIEnv *env, jobject thiz, - jobject to_merge) { - return jni_utils::run_catching_cxx_exception_or_throws(env, [=] { - std::lock_guard lock{util::util_mutex_}; - auto conf = ptrToConfigBase(env, thiz); - std::vector> configs = { - extractHashAndData(env, to_merge)}; - auto returned = conf->merge(configs); - auto string_stack = util::build_string_stack(env, returned); - return string_stack; - }); -} - #pragma clang diagnostic pop } extern "C" diff --git a/libsession-util/src/main/cpp/config_common.cpp b/libsession-util/src/main/cpp/config_common.cpp index 74e57305e6..848c816bf7 100644 --- a/libsession-util/src/main/cpp/config_common.cpp +++ b/libsession-util/src/main/cpp/config_common.cpp @@ -30,7 +30,7 @@ Java_network_loki_messenger_libsession_1util_ConfigKt_createConfigObject( return reinterpret_cast(new session::config::UserProfile(secret_key, initial)); } else if (config_name == "UserGroups") { return reinterpret_cast(new session::config::UserGroups(secret_key, initial)); - } else if (config_name == "ConversationVolatileConfig") { + } else if (config_name == "ConvoInfoVolatile") { return reinterpret_cast(new session::config::ConvoInfoVolatile(secret_key, initial)); } else { throw std::invalid_argument("Unknown config name: " + config_name); diff --git a/libsession-util/src/main/cpp/group_keys.cpp b/libsession-util/src/main/cpp/group_keys.cpp index 956d3b7225..03ab3ff2fa 100644 --- a/libsession-util/src/main/cpp/group_keys.cpp +++ b/libsession-util/src/main/cpp/group_keys.cpp @@ -2,6 +2,8 @@ #include "group_info.h" #include "group_members.h" +#include "jni_utils.h" + extern "C" JNIEXPORT jint JNICALL Java_network_loki_messenger_libsession_1util_GroupKeysConfig_00024Companion_storageNamespace(JNIEnv* env, @@ -30,7 +32,7 @@ Java_network_loki_messenger_libsession_1util_GroupKeysConfig_00024Companion_newI secret_key_optional = secret_key_bytes; } - if (env->GetArrayLength(initial_dump) > 0) { + if (initial_dump && env->GetArrayLength(initial_dump) > 0) { auto initial_dump_bytes = util::ustring_from_bytes(env, initial_dump); initial_dump_optional = initial_dump_bytes; } @@ -165,21 +167,23 @@ extern "C" JNIEXPORT jbyteArray JNICALL Java_network_loki_messenger_libsession_1util_GroupKeysConfig_encrypt(JNIEnv *env, jobject thiz, jbyteArray plaintext) { - std::lock_guard lock{util::util_mutex_}; - auto ptr = ptrToKeys(env, thiz); - auto plaintext_ustring = util::ustring_from_bytes(env, plaintext); - auto enc = ptr->encrypt_message(plaintext_ustring); - return util::bytes_from_ustring(env, enc); + return jni_utils::run_catching_cxx_exception_or_throws(env, [=] { + std::lock_guard lock{util::util_mutex_}; + auto ptr = ptrToKeys(env, thiz); + auto plaintext_ustring = util::ustring_from_bytes(env, plaintext); + auto enc = ptr->encrypt_message(plaintext_ustring); + return util::bytes_from_ustring(env, enc); + }); } extern "C" JNIEXPORT jobject JNICALL Java_network_loki_messenger_libsession_1util_GroupKeysConfig_decrypt(JNIEnv *env, jobject thiz, jbyteArray ciphertext) { - std::lock_guard lock{util::util_mutex_}; - auto ptr = ptrToKeys(env, thiz); - auto ciphertext_ustring = util::ustring_from_bytes(env, ciphertext); - try { + return jni_utils::run_catching_cxx_exception_or_throws(env, [=] { + std::lock_guard lock{util::util_mutex_}; + auto ptr = ptrToKeys(env, thiz); + auto ciphertext_ustring = util::ustring_from_bytes(env, ciphertext); auto decrypted = ptr->decrypt_message(ciphertext_ustring); auto sender = decrypted.first; auto plaintext = decrypted.second; @@ -189,12 +193,9 @@ Java_network_loki_messenger_libsession_1util_GroupKeysConfig_decrypt(JNIEnv *env auto pair_constructor = env->GetMethodID(pair_class, "", "(Ljava/lang/Object;Ljava/lang/Object;)V"); auto pair_obj = env->NewObject(pair_class, pair_constructor, plaintext_bytes, sender_session_id); return pair_obj; - } catch (std::exception& e) { - // TODO: maybe log here - } - - return nullptr; + }); } + extern "C" JNIEXPORT jobject JNICALL Java_network_loki_messenger_libsession_1util_GroupKeysConfig_keys(JNIEnv *env, jobject thiz) { diff --git a/libsession-util/src/main/java/network/loki/messenger/libsession_util/Config.kt b/libsession-util/src/main/java/network/loki/messenger/libsession_util/Config.kt index 5bf31eac3e..ac6f255117 100644 --- a/libsession-util/src/main/java/network/loki/messenger/libsession_util/Config.kt +++ b/libsession-util/src/main/java/network/loki/messenger/libsession_util/Config.kt @@ -48,7 +48,6 @@ interface MutableConfig : ReadableConfig { fun dump(): ByteArray fun encryptionDomain(): String fun confirmPushed(seqNo: Long, newHash: String) - fun merge(toMerge: Array>): Stack fun dirty(): Boolean } @@ -81,11 +80,8 @@ sealed class ConfigBase(pointer: Long): Config(pointer), MutableConfig { external override fun dump(): ByteArray external override fun encryptionDomain(): String external override fun confirmPushed(seqNo: Long, newHash: String) - external override fun merge(toMerge: Array>): Stack + external fun merge(toMerge: Array>): Stack external override fun currentHashes(): List - - // Singular merge - external fun merge(toMerge: Pair): Stack } diff --git a/libsession/src/main/java/org/session/libsession/messaging/configs/ConfigSyncHandler.kt b/libsession/src/main/java/org/session/libsession/messaging/configs/ConfigSyncHandler.kt index 6c3fa303a2..379a14d305 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/configs/ConfigSyncHandler.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/configs/ConfigSyncHandler.kt @@ -1,133 +1,255 @@ package org.session.libsession.messaging.configs -import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.CoroutineDispatcher +import kotlinx.coroutines.DelicateCoroutinesApi +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.Job import kotlinx.coroutines.async import kotlinx.coroutines.awaitAll import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.launch -import network.loki.messenger.libsession_util.MutableConfig import network.loki.messenger.libsession_util.util.ConfigPush import org.session.libsession.database.StorageProtocol import org.session.libsession.database.userAuth +import org.session.libsession.snode.OwnedSwarmAuth import org.session.libsession.snode.SnodeAPI import org.session.libsession.snode.SnodeMessage +import org.session.libsession.snode.SwarmAuth +import org.session.libsession.snode.model.StoreMessageResponse import org.session.libsession.snode.utilities.await import org.session.libsession.utilities.ConfigFactoryProtocol +import org.session.libsession.utilities.ConfigPushResult import org.session.libsession.utilities.ConfigUpdateNotification +import org.session.libsession.utilities.UserConfigType +import org.session.libsession.utilities.getClosedGroup import org.session.libsignal.utilities.AccountId import org.session.libsignal.utilities.Base64 import org.session.libsignal.utilities.Log +import org.session.libsignal.utilities.Namespace +import org.session.libsignal.utilities.Snode +import javax.inject.Inject -class ConfigSyncHandler( +private const val TAG = "ConfigSyncHandler" + +/** + * This class is responsible for sending the local config changes to the swarm. + * + * It does so by listening for changes in the config factory. + */ +class ConfigSyncHandler @Inject constructor( private val configFactory: ConfigFactoryProtocol, private val storageProtocol: StorageProtocol, - @Suppress("OPT_IN_USAGE") scope: CoroutineScope = GlobalScope, ) { - init { - scope.launch { + private var job: Job? = null + + @OptIn(ExperimentalCoroutinesApi::class, DelicateCoroutinesApi::class) + fun start() { + require(job == null) { "Already started" } + + job = GlobalScope.launch { + val groupDispatchers = hashMapOf() + val userConfigDispatcher = Dispatchers.Default.limitedParallelism(1) + configFactory.configUpdateNotifications.collect { changes -> try { when (changes) { - is ConfigUpdateNotification.GroupConfigsDeleted -> {} - is ConfigUpdateNotification.GroupConfigsUpdated -> { - pushGroupConfigsChangesIfNeeded(changes.groupId) + is ConfigUpdateNotification.GroupConfigsDeleted -> { + groupDispatchers.remove(changes.groupId) + } + + is ConfigUpdateNotification.GroupConfigsUpdated -> { + // Group config pushing is limited to its own dispatcher + launch(groupDispatchers.getOrPut(changes.groupId) { + Dispatchers.Default.limitedParallelism(1) + }) { + pushGroupConfigsChangesIfNeeded(changes.groupId) + } + } + + ConfigUpdateNotification.UserConfigs -> launch(userConfigDispatcher) { + pushUserConfigChangesIfNeeded() } - ConfigUpdateNotification.UserConfigs -> pushUserConfigChangesIfNeeded() } } catch (e: Exception) { - Log.e("ConfigSyncHandler", "Error handling config update", e) + Log.e(TAG, "Error handling config update", e) } } } - } - - private suspend fun pushGroupConfigsChangesIfNeeded(groupId: AccountId): Unit = coroutineScope { } - private suspend fun pushUserConfigChangesIfNeeded(): Unit = coroutineScope { + private suspend fun pushGroupConfigsChangesIfNeeded(groupId: AccountId) = coroutineScope { + // Only admin can push group configs + val adminKey = configFactory.getClosedGroup(groupId)?.adminKey + if (adminKey == null) { + Log.i(TAG, "Skipping group config push without admin key") + return@coroutineScope + } + + // Gather data to push + val (membersPush, infoPush, keysPush) = configFactory.withMutableGroupConfigs(groupId) { configs -> + val membersPush = if (configs.groupMembers.needsPush()) { + configs.groupMembers.push() + } else { + null + } + + val infoPush = if (configs.groupInfo.needsPush()) { + configs.groupInfo.push() + } else { + null + } + + Triple(membersPush, infoPush, configs.groupKeys.pendingConfig()) + } + + // Nothing to push? + if (membersPush == null && infoPush == null && keysPush == null) { + return@coroutineScope + } + + Log.d(TAG, "Pushing group configs") + + val snode = SnodeAPI.getSingleTargetSnode(groupId.hexString).await() + val auth = OwnedSwarmAuth.ofClosedGroup(groupId, adminKey) + + // Spawn the config pushing concurrently + val membersConfigHashTask = membersPush?.let { + async { + membersPush to pushConfig( + auth, + snode, + membersPush, + Namespace.CLOSED_GROUP_MEMBERS() + ) + } + } + + val infoConfigHashTask = infoPush?.let { + async { + infoPush to pushConfig(auth, snode, infoPush, Namespace.CLOSED_GROUP_INFO()) + } + } + + // Keys push is different: it doesn't have the delete call so we don't call pushConfig + val keysPushResult = keysPush?.let { + SnodeAPI.sendBatchRequest( + snode = snode, + publicKey = auth.accountId.hexString, + request = SnodeAPI.buildAuthenticatedStoreBatchInfo( + Namespace.ENCRYPTION_KEYS(), + SnodeMessage( + auth.accountId.hexString, + Base64.encodeBytes(keysPush), + SnodeMessage.CONFIG_TTL, + SnodeAPI.nowWithOffset, + ), + auth + ), + responseType = StoreMessageResponse::class.java + ).toConfigPushResult() + } + + // Wait for all other config push to come back + val memberPushResult = membersConfigHashTask?.await() + val infoPushResult = infoConfigHashTask?.await() + + configFactory.confirmGroupConfigsPushed( + groupId, + memberPushResult, + infoPushResult, + keysPushResult + ) + + Log.i( + TAG, + "Pushed group configs, " + + "info = ${infoPush != null}, " + + "members = ${membersPush != null}, " + + "keys = ${keysPush != null}" + ) + } + + private suspend fun pushConfig( + auth: SwarmAuth, + snode: Snode, + push: ConfigPush, + namespace: Int + ): ConfigPushResult { + val response = SnodeAPI.sendBatchRequest( + snode = snode, + publicKey = auth.accountId.hexString, + request = SnodeAPI.buildAuthenticatedStoreBatchInfo( + namespace, + SnodeMessage( + auth.accountId.hexString, + Base64.encodeBytes(push.config), + SnodeMessage.CONFIG_TTL, + SnodeAPI.nowWithOffset, + ), + auth, + ), + responseType = StoreMessageResponse::class.java + ) + + if (push.obsoleteHashes.isNotEmpty()) { + SnodeAPI.sendBatchRequest( + snode = snode, + publicKey = auth.accountId.hexString, + request = SnodeAPI.buildAuthenticatedDeleteBatchInfo(auth, push.obsoleteHashes) + ) + } + + return response.toConfigPushResult() + } + + private suspend fun pushUserConfigChangesIfNeeded() = coroutineScope { val userAuth = requireNotNull(storageProtocol.userAuth) { "Current user not available" } - data class PushInformation( - val namespace: Int, - val configClass: Class, - val push: ConfigPush, - ) - // Gather all the user configs that need to be pushed val pushes = configFactory.withMutableUserConfigs { configs -> - configs.allConfigs() - .filter { it.needsPush() } - .map { config -> - PushInformation( - namespace = config.namespace(), - configClass = config.javaClass, - push = config.push(), - ) + UserConfigType.entries + .mapNotNull { type -> + val config = configs.getConfig(type) + if (!config.needsPush()) { + return@mapNotNull null + } + + type to config.push() } - .toList() } - Log.d("ConfigSyncHandler", "Pushing ${pushes.size} configs") + if (pushes.isEmpty()) { + return@coroutineScope + } + + Log.d(TAG, "Pushing ${pushes.size} user configs") val snode = SnodeAPI.getSingleTargetSnode(userAuth.accountId.hexString).await() - val pushTasks = pushes.map { info -> - val calls = buildList { - this += SnodeAPI.buildAuthenticatedStoreBatchInfo( - info.namespace, - SnodeMessage( - userAuth.accountId.hexString, - Base64.encodeBytes(info.push.config), - SnodeMessage.CONFIG_TTL, - SnodeAPI.nowWithOffset, - ), - userAuth - ) - - if (info.push.obsoleteHashes.isNotEmpty()) { - this += SnodeAPI.buildAuthenticatedDeleteBatchInfo( - messageHashes = info.push.obsoleteHashes, - auth = userAuth, - ) - } - } - + val pushTasks = pushes.map { (configType, configPush) -> async { - val responses = SnodeAPI.getBatchResponse( - snode = snode, - publicKey = userAuth.accountId.hexString, - requests = calls, - sequence = true - ) - - val firstError = responses.results.firstOrNull { !it.isSuccessful } - check(firstError == null) { - "Failed to push config change due to error: ${firstError?.body}" - } - - val hash = responses.results.first().body.get("hash").asText() - require(hash.isNotEmpty()) { - "Missing server hash for pushed config" - } - - info to hash + (configType to configPush) to pushConfig(userAuth, snode, configPush, configType.namespace) } } - val pushResults = pushTasks.awaitAll().associateBy { it.first.configClass } + val pushResults = pushTasks.awaitAll().associate { it.first.first to (it.first.second to it.second) } - Log.d("ConfigSyncHandler", "Pushed ${pushResults.size} configs") + Log.d(TAG, "Pushed ${pushResults.size} user configs") - configFactory.withMutableUserConfigs { configs -> - configs.allConfigs() - .mapNotNull { config -> pushResults[config.javaClass]?.let { Triple(config, it.first, it.second) } } - .forEach { (config, info, hash) -> - config.confirmPushed(info.push.seqNo, hash) - } - } + configFactory.confirmUserConfigsPushed( + contacts = pushResults[UserConfigType.CONTACTS], + userGroups = pushResults[UserConfigType.USER_GROUPS], + convoInfoVolatile = pushResults[UserConfigType.CONVO_INFO_VOLATILE], + userProfile = pushResults[UserConfigType.USER_PROFILE] + ) + } + + private fun StoreMessageResponse.toConfigPushResult(): ConfigPushResult { + return ConfigPushResult(hash, timestamp) } } \ No newline at end of file diff --git a/libsession/src/main/java/org/session/libsession/messaging/groups/RemoveGroupMemberHandler.kt b/libsession/src/main/java/org/session/libsession/messaging/groups/RemoveGroupMemberHandler.kt index d4d7641991..1a7fe3da16 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/groups/RemoveGroupMemberHandler.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/groups/RemoveGroupMemberHandler.kt @@ -3,9 +3,12 @@ package org.session.libsession.messaging.groups import android.os.SystemClock import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.Job import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.async import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.first import kotlinx.coroutines.flow.firstOrNull import kotlinx.coroutines.launch import network.loki.messenger.libsession_util.ReadableGroupKeysConfig @@ -19,24 +22,34 @@ import org.session.libsession.snode.SnodeAPI import org.session.libsession.snode.SnodeMessage import org.session.libsession.snode.utilities.await import org.session.libsession.utilities.ConfigFactoryProtocol +import org.session.libsession.utilities.TextSecurePreferences import org.session.libsignal.protos.SignalServiceProtos import org.session.libsignal.protos.SignalServiceProtos.DataMessage.GroupUpdateMessage import org.session.libsignal.utilities.AccountId import org.session.libsignal.utilities.Base64 import org.session.libsignal.utilities.Log import org.session.libsignal.utilities.Namespace +import javax.inject.Inject private const val TAG = "RemoveGroupMemberHandler" private const val MIN_PROCESS_INTERVAL_MILLS = 1_000L -class RemoveGroupMemberHandler( +class RemoveGroupMemberHandler @Inject constructor( private val configFactory: ConfigFactoryProtocol, - private val scope: CoroutineScope = CoroutineScope(SupervisorJob() + Dispatchers.Default) + private val textSecurePreferences: TextSecurePreferences, ) { - init { - scope.launch { + private val scope: CoroutineScope = GlobalScope + private var job: Job? = null + + fun start() { + require(job == null) { "Already started" } + + job = scope.launch { while (true) { + // Make sure we have a local number before we start processing + textSecurePreferences.watchLocalNumber().first { it != null } + val processStartedAt = SystemClock.uptimeMillis() try { diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt index 288e6778ed..bbfca36102 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt @@ -118,6 +118,7 @@ class JobQueue : JobDelegate { while (isActive) { when (val job = queue.receive()) { + is InviteContactsJob, is NotifyPNServerJob, is AttachmentUploadJob, is GroupLeavingJob, @@ -226,6 +227,7 @@ class JobQueue : JobDelegate { OpenGroupDeleteJob.KEY, RetrieveProfileAvatarJob.KEY, GroupLeavingJob.KEY, + InviteContactsJob.KEY, LibSessionGroupLeavingJob.KEY ) allJobTypes.forEach { type -> diff --git a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/ClosedGroupPoller.kt b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/ClosedGroupPoller.kt index b9b2ca62c3..98005cf0c0 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/ClosedGroupPoller.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/ClosedGroupPoller.kt @@ -9,7 +9,6 @@ import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay import kotlinx.coroutines.isActive import kotlinx.coroutines.launch -import network.loki.messenger.libsession_util.util.GroupInfo import network.loki.messenger.libsession_util.util.Sodium import org.session.libsession.database.StorageProtocol import org.session.libsession.messaging.groups.GroupManagerV2 @@ -19,10 +18,13 @@ import org.session.libsession.messaging.jobs.MessageReceiveParameters import org.session.libsession.messaging.messages.Destination import org.session.libsession.snode.RawResponse import org.session.libsession.snode.SnodeAPI +import org.session.libsession.snode.model.RetrieveMessageResponse import org.session.libsession.snode.utilities.await import org.session.libsession.utilities.ConfigFactoryProtocol +import org.session.libsession.utilities.ConfigMessage +import org.session.libsession.utilities.getClosedGroup +import org.session.libsignal.database.LokiAPIDatabaseProtocol import org.session.libsignal.utilities.AccountId -import org.session.libsignal.utilities.Base64 import org.session.libsignal.utilities.Log import org.session.libsignal.utilities.Namespace import org.session.libsignal.utilities.Snode @@ -36,37 +38,12 @@ class ClosedGroupPoller( private val configFactoryProtocol: ConfigFactoryProtocol, private val groupManagerV2: GroupManagerV2, private val storage: StorageProtocol, + private val lokiApiDatabase: LokiAPIDatabaseProtocol, ) { - - data class ParsedRawMessage( - val data: ByteArray, - val hash: String, - val timestamp: Long - ) { - override fun equals(other: Any?): Boolean { - if (this === other) return true - if (javaClass != other?.javaClass) return false - - other as ParsedRawMessage - - if (!data.contentEquals(other.data)) return false - if (hash != other.hash) return false - if (timestamp != other.timestamp) return false - - return true - } - - override fun hashCode(): Int { - var result = data.contentHashCode() - result = 31 * result + hash.hashCode() - result = 31 * result + timestamp.hashCode() - return result - } - } - companion object { - const val POLL_INTERVAL = 3_000L - const val ENABLE_LOGGING = false + private const val POLL_INTERVAL = 3_000L + + private const val TAG = "ClosedGroupPoller" } private var job: Job? = null @@ -74,26 +51,36 @@ class ClosedGroupPoller( fun start() { if (job?.isActive == true) return // already started, don't restart - if (ENABLE_LOGGING) Log.d("ClosedGroupPoller", "Starting closed group poller for ${closedGroupSessionId.hexString.take(4)}") + Log.d(TAG, "Starting closed group poller for ${closedGroupSessionId.hexString.take(4)}") job?.cancel() job = scope.launch(executor) { + var snode: Snode? = null + while (isActive) { - val group = configFactoryProtocol.withUserConfigs { it.userGroups.getClosedGroup(closedGroupSessionId.hexString) } ?: break - val nextPoll = runCatching { poll(group) } + configFactoryProtocol.getClosedGroup(closedGroupSessionId) ?: break + + if (snode == null) { + Log.i(TAG, "No Snode, fetching one") + snode = SnodeAPI.getSingleTargetSnode(closedGroupSessionId.hexString).await() + } + + val nextPoll = runCatching { poll(snode!!) } when { nextPoll.isFailure -> { - Log.e("ClosedGroupPoller", "Error polling closed group", nextPoll.exceptionOrNull()) + Log.e(TAG, "Error polling closed group", nextPoll.exceptionOrNull()) + // Clearing snode so we get a new one next time + snode = null delay(POLL_INTERVAL) } nextPoll.getOrNull() == null -> { // assume null poll time means don't continue polling, either the group has been deleted or something else - if (ENABLE_LOGGING) Log.d("ClosedGroupPoller", "Stopping the closed group poller") + Log.d(TAG, "Stopping the closed group poller") break } else -> { - delay(nextPoll.getOrThrow()!!) + delay(POLL_INTERVAL) } } } @@ -105,10 +92,9 @@ class ClosedGroupPoller( job = null } - private suspend fun poll(group: GroupInfo.ClosedGroupInfo): Long? = coroutineScope { - val snode = SnodeAPI.getSingleTargetSnode(closedGroupSessionId.hexString).await() - - val groupAuth = configFactoryProtocol.getGroupAuth(closedGroupSessionId) ?: return@coroutineScope null + private suspend fun poll(snode: Snode): Unit = coroutineScope { + val groupAuth = + configFactoryProtocol.getGroupAuth(closedGroupSessionId) ?: return@coroutineScope val configHashesToExtends = configFactoryProtocol.withGroupConfigs(closedGroupSessionId) { buildSet { addAll(it.groupKeys.currentHashes()) @@ -117,91 +103,36 @@ class ClosedGroupPoller( } } - val adminKey = requireNotNull(configFactoryProtocol.withUserConfigs { it.userGroups.getClosedGroup(closedGroupSessionId.hexString) }) { + val adminKey = requireNotNull(configFactoryProtocol.withUserConfigs { + it.userGroups.getClosedGroup(closedGroupSessionId.hexString) + }) { "Group doesn't exist" }.adminKey val pollingTasks = mutableListOf>>() - pollingTasks += "Poll revoked messages" to async { + pollingTasks += "retrieving revoked messages" to async { handleRevoked( SnodeAPI.sendBatchRequest( snode, closedGroupSessionId.hexString, SnodeAPI.buildAuthenticatedRetrieveBatchRequest( - snode = snode, + lastHash = lokiApiDatabase.getLastMessageHashValue( + snode, + closedGroupSessionId.hexString, + Namespace.REVOKED_GROUP_MESSAGES() + ).orEmpty(), auth = groupAuth, namespace = Namespace.REVOKED_GROUP_MESSAGES(), maxSize = null, ), - Map::class.java + RetrieveMessageResponse::class.java ) ) } - pollingTasks += "Poll group messages" to async { - handleMessages( - body = SnodeAPI.sendBatchRequest( - snode, - closedGroupSessionId.hexString, - SnodeAPI.buildAuthenticatedRetrieveBatchRequest( - snode = snode, - auth = groupAuth, - namespace = Namespace.CLOSED_GROUP_MESSAGES(), - maxSize = null, - ), - Map::class.java), - snode = snode, - ) - } - - pollingTasks += "Poll group keys config" to async { - handleKeyPoll( - response = SnodeAPI.sendBatchRequest( - snode, - closedGroupSessionId.hexString, - SnodeAPI.buildAuthenticatedRetrieveBatchRequest( - snode = snode, - auth = groupAuth, - namespace = Namespace.ENCRYPTION_KEYS(), - maxSize = null, - ), - Map::class.java), - ) - } - - pollingTasks += "Poll group info config" to async { - handleInfo( - response = SnodeAPI.sendBatchRequest( - snode, - closedGroupSessionId.hexString, - SnodeAPI.buildAuthenticatedRetrieveBatchRequest( - snode = snode, - auth = groupAuth, - namespace = Namespace.CLOSED_GROUP_INFO(), - maxSize = null, - ), - Map::class.java), - ) - } - - pollingTasks += "Poll group members config" to async { - handleMembers( - SnodeAPI.sendBatchRequest( - snode, - closedGroupSessionId.hexString, - SnodeAPI.buildAuthenticatedRetrieveBatchRequest( - snode = snode, - auth = groupAuth, - namespace = Namespace.CLOSED_GROUP_MEMBERS(), - maxSize = null, - ), - Map::class.java), - ) - } - if (configHashesToExtends.isNotEmpty() && adminKey != null) { - pollingTasks += "Extend group config TTL" to async { + pollingTasks += "extending group config TTL" to async { SnodeAPI.sendBatchRequest( snode, closedGroupSessionId.hexString, @@ -215,52 +146,105 @@ class ClosedGroupPoller( } } + val groupMessageRetrieval = async { + SnodeAPI.sendBatchRequest( + snode = snode, + publicKey = closedGroupSessionId.hexString, + request = SnodeAPI.buildAuthenticatedRetrieveBatchRequest( + lastHash = lokiApiDatabase.getLastMessageHashValue( + snode, + closedGroupSessionId.hexString, + Namespace.CLOSED_GROUP_MESSAGES() + ).orEmpty(), + auth = groupAuth, + namespace = Namespace.CLOSED_GROUP_MESSAGES(), + maxSize = null, + ), + responseType = Map::class.java + ) + } + + val groupConfigRetrieval = listOf( + Namespace.ENCRYPTION_KEYS(), + Namespace.CLOSED_GROUP_INFO(), + Namespace.CLOSED_GROUP_MEMBERS() + ).map { ns -> + async { + SnodeAPI.sendBatchRequest( + snode = snode, + publicKey = closedGroupSessionId.hexString, + request = SnodeAPI.buildAuthenticatedRetrieveBatchRequest( + lastHash = lokiApiDatabase.getLastMessageHashValue( + snode, + closedGroupSessionId.hexString, + ns + ).orEmpty(), + auth = groupAuth, + namespace = ns, + maxSize = null, + ), + responseType = RetrieveMessageResponse::class.java + ) + } + } + + // The retrieval of the config and regular messages can be done concurrently, + // however, in order for the messages to be able to be decrypted, the config messages + // must be processed first. + pollingTasks += "polling and handling group config keys and messages" to async { + val (keysMessage, infoMessage, membersMessage) = groupConfigRetrieval.map { it.await() } + saveLastMessageHash(snode, keysMessage, Namespace.ENCRYPTION_KEYS()) + saveLastMessageHash(snode, infoMessage, Namespace.CLOSED_GROUP_INFO()) + saveLastMessageHash(snode, membersMessage, Namespace.CLOSED_GROUP_MEMBERS()) + handleGroupConfigMessages(keysMessage, infoMessage, membersMessage) + + val regularMessages = groupMessageRetrieval.await() + handleMessages(regularMessages, snode) + } + + // Wait for all tasks to complete, gather any exceptions happened during polling val errors = pollingTasks.mapNotNull { (name, task) -> runCatching { task.await() } .exceptionOrNull() ?.takeIf { it !is CancellationException } - ?.let { RuntimeException("Error executing: $name", it) } + ?.let { RuntimeException("Error $name", it) } } + // If there were any errors, throw the first one and add the rest as "suppressed" exceptions if (errors.isNotEmpty()) { - throw PollerException("Error polling closed group", errors) - } - - POLL_INTERVAL // this might change in future - } - - private fun parseMessages(body: RawResponse): List { - val messages = body["messages"] as? List<*> ?: return emptyList() - return messages.mapNotNull { messageMap -> - val rawMessageAsJSON = messageMap as? Map<*, *> ?: return@mapNotNull null - val base64EncodedData = rawMessageAsJSON["data"] as? String ?: return@mapNotNull null - val hash = rawMessageAsJSON["hash"] as? String ?: return@mapNotNull null - val timestamp = rawMessageAsJSON["timestamp"] as? Long ?: return@mapNotNull null - val data = base64EncodedData.let { Base64.decode(it) } - ParsedRawMessage(data, hash, timestamp) + throw errors.first().apply { + for (index in 1 until errors.size) { + addSuppressed(errors[index]) + } + } } } - private suspend fun handleRevoked(body: RawResponse) { - // This shouldn't ever return null at this point - val messages = body["messages"] as? List<*> - ?: return Log.w("GroupPoller", "body didn't contain a list of messages") - messages.forEach { messageMap -> - val rawMessageAsJSON = messageMap as? Map<*,*> - ?: return@forEach Log.w("GroupPoller", "rawMessage wasn't a map as expected") - val data = rawMessageAsJSON["data"] as? String ?: return@forEach - val hash = rawMessageAsJSON["hash"] as? String ?: return@forEach - val timestamp = rawMessageAsJSON["timestamp"] as? Long ?: return@forEach - Log.d("GroupPoller", "Handling message with hash $hash") + private fun RetrieveMessageResponse.Message.toConfigMessage(): ConfigMessage { + return ConfigMessage(hash, data, timestamp) + } + private fun saveLastMessageHash(snode: Snode, body: RetrieveMessageResponse, namespace: Int) { + if (body.messages.isNotEmpty()) { + lokiApiDatabase.setLastMessageHashValue( + snode = snode, + publicKey = closedGroupSessionId.hexString, + newValue = body.messages.last().hash, + namespace = namespace + ) + } + } + + private suspend fun handleRevoked(body: RetrieveMessageResponse) { + body.messages.forEach { msg -> val decoded = configFactoryProtocol.maybeDecryptForUser( - Base64.decode(data), + msg.data, Sodium.KICKED_DOMAIN, closedGroupSessionId, ) if (decoded != null) { - Log.d("GroupPoller", "decoded kick message was for us") + Log.d(TAG, "decoded kick message was for us") val message = decoded.decodeToString() if (Sodium.KICKED_REGEX.matches(message)) { val (sessionId, generation) = message.split("-") @@ -279,44 +263,31 @@ class ClosedGroupPoller( } } } - } } - private fun handleKeyPoll(response: RawResponse) { - // get all the data to hash objects and process them - val allMessages = parseMessages(response) - if (ENABLE_LOGGING) Log.d("ClosedGroupPoller", "Total key messages this poll: ${allMessages.size}") - var total = 0 - allMessages.forEach { (message, hash, timestamp) -> - configFactoryProtocol.withMutableGroupConfigs(closedGroupSessionId) { configs -> - if (configs.loadKeys(message, hash, timestamp)) { - total++ - } - } - - if (ENABLE_LOGGING) Log.d("ClosedGroupPoller", "Merged $hash for keys on ${closedGroupSessionId.hexString}") + private fun handleGroupConfigMessages( + keysResponse: RetrieveMessageResponse, + infoResponse: RetrieveMessageResponse, + membersResponse: RetrieveMessageResponse + ) { + if (keysResponse.messages.isEmpty() && infoResponse.messages.isEmpty() && membersResponse.messages.isEmpty()) { + return } - if (ENABLE_LOGGING) Log.d("ClosedGroupPoller", "Total key messages consumed: $total") - } - private fun handleInfo(response: RawResponse) { - val messages = parseMessages(response) - messages.forEach { (message, hash, _) -> - configFactoryProtocol.withMutableGroupConfigs(closedGroupSessionId) { configs -> - configs.groupInfo.merge(arrayOf(hash to message)) - } - if (ENABLE_LOGGING) Log.d("ClosedGroupPoller", "Merged $hash for info on ${closedGroupSessionId.hexString}") - } - } + Log.d( + TAG, "Handling group config messages(" + + "info = ${infoResponse.messages.size}, " + + "keys = ${keysResponse.messages.size}, " + + "members = ${membersResponse.messages.size})" + ) - private fun handleMembers(response: RawResponse) { - parseMessages(response).forEach { (message, hash, _) -> - configFactoryProtocol.withMutableGroupConfigs(closedGroupSessionId) { configs -> - configs.groupMembers.merge(arrayOf(hash to message)) - } - if (ENABLE_LOGGING) Log.d("ClosedGroupPoller", "Merged $hash for members on ${closedGroupSessionId.hexString}") - } + configFactoryProtocol.mergeGroupConfigMessages( + groupId = closedGroupSessionId, + keys = keysResponse.messages.map { it.toConfigMessage() }, + info = infoResponse.messages.map { it.toConfigMessage() }, + members = membersResponse.messages.map { it.toConfigMessage() }, + ) } private fun handleMessages(body: RawResponse, snode: Snode) { @@ -342,8 +313,8 @@ class ClosedGroupPoller( JobQueue.shared.add(job) } - if (ENABLE_LOGGING) Log.d("ClosedGroupPoller", "namespace for messages rx count: ${messages.size}") - + if (messages.isNotEmpty()) { + Log.d(TAG, "namespace for messages rx count: ${messages.size}") + } } - } \ No newline at end of file diff --git a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/Poller.kt b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/Poller.kt index b12c9065fa..56357fd837 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/Poller.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/Poller.kt @@ -3,25 +3,14 @@ package org.session.libsession.messaging.sending_receiving.pollers import android.util.SparseArray import androidx.core.util.valueIterator import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.async -import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.runBlocking -import network.loki.messenger.libsession_util.ConfigBase -import network.loki.messenger.libsession_util.Contacts -import network.loki.messenger.libsession_util.ConversationVolatileConfig -import network.loki.messenger.libsession_util.MutableConfig -import network.loki.messenger.libsession_util.MutableContacts -import network.loki.messenger.libsession_util.MutableConversationVolatileConfig -import network.loki.messenger.libsession_util.MutableUserGroupsConfig -import network.loki.messenger.libsession_util.MutableUserProfile -import network.loki.messenger.libsession_util.UserGroupsConfig -import network.loki.messenger.libsession_util.UserProfile import nl.komponents.kovenant.Deferred import nl.komponents.kovenant.Promise import nl.komponents.kovenant.deferred import nl.komponents.kovenant.functional.bind import nl.komponents.kovenant.resolve import nl.komponents.kovenant.task +import org.session.libsession.database.StorageProtocol import org.session.libsession.database.userAuth import org.session.libsession.messaging.MessagingModuleConfiguration import org.session.libsession.messaging.jobs.BatchMessageReceiveJob @@ -31,8 +20,9 @@ import org.session.libsession.snode.RawResponse import org.session.libsession.snode.SnodeAPI import org.session.libsession.snode.SnodeModule import org.session.libsession.utilities.ConfigFactoryProtocol -import org.session.libsession.utilities.Contact.Name -import org.session.libsession.utilities.MutableGroupConfigs +import org.session.libsession.utilities.ConfigMessage +import org.session.libsession.utilities.UserConfigType +import org.session.libsignal.database.LokiAPIDatabaseProtocol import org.session.libsignal.utilities.Base64 import org.session.libsignal.utilities.Log import org.session.libsignal.utilities.Namespace @@ -46,8 +36,14 @@ private const val TAG = "Poller" private class PromiseCanceledException : Exception("Promise canceled.") -class Poller(private val configFactory: ConfigFactoryProtocol) { - var userPublicKey = MessagingModuleConfiguration.shared.storage.getUserPublicKey() ?: "" +class Poller( + private val configFactory: ConfigFactoryProtocol, + private val storage: StorageProtocol, + private val lokiApiDatabase: LokiAPIDatabaseProtocol +) { + private val userPublicKey: String + get() = storage.getUserPublicKey().orEmpty() + private var hasStarted: Boolean = false private val usedSnodes: MutableSet = mutableSetOf() var isCaughtUp = false @@ -74,12 +70,14 @@ class Poller(private val configFactory: ConfigFactoryProtocol) { } fun retrieveUserProfile() { - Log.d(TAG, "Retrieving user profile.") + Log.d(TAG, "Retrieving user profile. for key = $userPublicKey") SnodeAPI.getSwarm(userPublicKey).bind { usedSnodes.clear() deferred().also { pollNextSnode(userProfileOnly = true, it) }.promise + }.fail { + Log.e(TAG, "Failed to retrieve user profile.", it) } } // endregion @@ -144,8 +142,9 @@ class Poller(private val configFactory: ConfigFactoryProtocol) { } } - private fun processConfig(snode: Snode, rawMessages: RawResponse, namespace: Int, forConfig: Class) { + private fun processConfig(snode: Snode, rawMessages: RawResponse, forConfig: UserConfigType) { val messages = rawMessages["messages"] as? List<*> + val namespace = forConfig.namespace val processed = if (!messages.isNullOrEmpty()) { SnodeAPI.updateLastMessageHashValueIfPossible(snode, userPublicKey, messages, namespace) SnodeAPI.removeDuplicates(userPublicKey, messages, namespace, true).mapNotNull { rawMessageAsJSON -> @@ -153,24 +152,21 @@ class Poller(private val configFactory: ConfigFactoryProtocol) { val b64EncodedBody = rawMessageAsJSON["data"] as? String ?: return@mapNotNull null val timestamp = rawMessageAsJSON["t"] as? Long ?: SnodeAPI.nowWithOffset val body = Base64.decode(b64EncodedBody) - Triple(body, hashValue, timestamp) + ConfigMessage(data = body, hash = hashValue, timestamp = timestamp) } } else emptyList() if (processed.isEmpty()) return - processed.forEach { (body, hash, _) -> - try { - configFactory.withMutableUserConfigs { configs -> - configs - .allConfigs() - .filter { it.javaClass.isInstance(forConfig) } - .first() - .merge(arrayOf(hash to body)) - } - } catch (e: Exception) { - Log.e(TAG, e) - } + Log.i(TAG, "Processing ${processed.size} messages for $forConfig") + + try { + configFactory.mergeUserConfigs( + userConfigType = forConfig, + messages = processed, + ) + } catch (e: Exception) { + Log.e(TAG, e) } } @@ -182,57 +178,57 @@ class Poller(private val configFactory: ConfigFactoryProtocol) { } private fun pollUserProfile(snode: Snode, deferred: Deferred): Promise = task { - runBlocking(Dispatchers.IO) { - val requests = mutableListOf() - val hashesToExtend = mutableSetOf() - val userAuth = requireNotNull(MessagingModuleConfiguration.shared.storage.userAuth) + val requests = mutableListOf() + val hashesToExtend = mutableSetOf() + val userAuth = requireNotNull(MessagingModuleConfiguration.shared.storage.userAuth) - configFactory.withUserConfigs { - val config = it.userProfile - hashesToExtend += config.currentHashes() - SnodeAPI.buildAuthenticatedRetrieveBatchRequest( - snode = snode, + configFactory.withUserConfigs { + hashesToExtend += it.userProfile.currentHashes() + } + + requests += SnodeAPI.buildAuthenticatedRetrieveBatchRequest( + lastHash = lokiApiDatabase.getLastMessageHashValue( + snode = snode, + publicKey = userAuth.accountId.hexString, + namespace = Namespace.USER_PROFILE() + ), + auth = userAuth, + namespace = Namespace.USER_PROFILE(), + maxSize = -8 + ) + + if (hashesToExtend.isNotEmpty()) { + SnodeAPI.buildAuthenticatedAlterTtlBatchRequest( + messageHashes = hashesToExtend.toList(), auth = userAuth, - namespace = config.namespace(), - maxSize = -8 - ) - }.let { request -> - requests += request + newExpiry = SnodeAPI.nowWithOffset + 14.days.inWholeMilliseconds, + extend = true + ).let { extensionRequest -> + requests += extensionRequest } + } - if (hashesToExtend.isNotEmpty()) { - SnodeAPI.buildAuthenticatedAlterTtlBatchRequest( - messageHashes = hashesToExtend.toList(), - auth = userAuth, - newExpiry = SnodeAPI.nowWithOffset + 14.days.inWholeMilliseconds, - extend = true - ).let { extensionRequest -> - requests += extensionRequest - } - } - - if (requests.isNotEmpty()) { - SnodeAPI.getRawBatchResponse(snode, userPublicKey, requests).bind { rawResponses -> - isCaughtUp = true - if (!deferred.promise.isDone()) { - val responseList = (rawResponses["results"] as List) - responseList.getOrNull(0)?.let { rawResponse -> - if (rawResponse["code"] as? Int != 200) { - Log.e(TAG, "Batch sub-request had non-200 response code, returned code ${(rawResponse["code"] as? Int) ?: "[unknown]"}") + if (requests.isNotEmpty()) { + SnodeAPI.getRawBatchResponse(snode, userPublicKey, requests).bind { rawResponses -> + isCaughtUp = true + if (!deferred.promise.isDone()) { + val responseList = (rawResponses["results"] as List) + responseList.getOrNull(0)?.let { rawResponse -> + if (rawResponse["code"] as? Int != 200) { + Log.e(TAG, "Batch sub-request had non-200 response code, returned code ${(rawResponse["code"] as? Int) ?: "[unknown]"}") + } else { + val body = rawResponse["body"] as? RawResponse + if (body == null) { + Log.e(TAG, "Batch sub-request didn't contain a body") } else { - val body = rawResponse["body"] as? RawResponse - if (body == null) { - Log.e(TAG, "Batch sub-request didn't contain a body") - } else { - processConfig(snode, body, Namespace.USER_PROFILE(), MutableUserProfile::class.java) - } + processConfig(snode, body, UserConfigType.USER_PROFILE) } } } - Promise.ofSuccess(Unit) - }.fail { - Log.e(TAG, "Failed to get raw batch response", it) } + Promise.ofSuccess(Unit) + }.fail { + Log.e(TAG, "Failed to get raw batch response", it) } } } @@ -245,23 +241,37 @@ class Poller(private val configFactory: ConfigFactoryProtocol) { val userAuth = requireNotNull(MessagingModuleConfiguration.shared.storage.userAuth) val requestSparseArray = SparseArray() // get messages - SnodeAPI.buildAuthenticatedRetrieveBatchRequest(snode, auth = userAuth, maxSize = -2) + SnodeAPI.buildAuthenticatedRetrieveBatchRequest( + lastHash = lokiApiDatabase.getLastMessageHashValue( + snode = snode, + publicKey = userAuth.accountId.hexString, + namespace = Namespace.DEFAULT() + ), + auth = userAuth, + maxSize = -2) .also { personalMessages -> // namespaces here should always be set requestSparseArray[personalMessages.namespace!!] = personalMessages } // get the latest convo info volatile val hashesToExtend = mutableSetOf() - configFactory.withUserConfigs { - it.allConfigs().map { config -> - hashesToExtend += config.currentHashes() - config.namespace() to SnodeAPI.buildAuthenticatedRetrieveBatchRequest( - snode = snode, - auth = userAuth, - namespace = config.namespace(), - maxSize = -8 - ) - } + configFactory.withUserConfigs { configs -> + UserConfigType + .entries + .map { type -> + val config = configs.getConfig(type) + hashesToExtend += config.currentHashes() + type.namespace to SnodeAPI.buildAuthenticatedRetrieveBatchRequest( + lastHash = lokiApiDatabase.getLastMessageHashValue( + snode = snode, + publicKey = userAuth.accountId.hexString, + namespace = type.namespace + ), + auth = userAuth, + namespace = type.namespace, + maxSize = -8 + ) + } }.forEach { (namespace, request) -> // namespaces here should always be set requestSparseArray[namespace] = request @@ -290,29 +300,24 @@ class Poller(private val configFactory: ConfigFactoryProtocol) { val responseList = (rawResponses["results"] as List) // in case we had null configs, the array won't be fully populated // index of the sparse array key iterator should be the request index, with the key being the namespace - sequenceOf( - Namespace.USER_PROFILE() to MutableUserProfile::class.java, - Namespace.CONTACTS() to MutableContacts::class.java, - Namespace.GROUPS() to MutableUserGroupsConfig::class.java, - Namespace.CONVO_INFO_VOLATILE() to MutableConversationVolatileConfig::class.java - ).map { (namespace, configClass) -> - Triple(namespace, configClass, requestSparseArray.indexOfKey(namespace)) - }.filter { (_, _, i) -> i >= 0 } - .forEach { (namespace, configClass, requestIndex) -> - responseList.getOrNull(requestIndex)?.let { rawResponse -> - if (rawResponse["code"] as? Int != 200) { - Log.e(TAG, "Batch sub-request had non-200 response code, returned code ${(rawResponse["code"] as? Int) ?: "[unknown]"}") - return@forEach + UserConfigType.entries + .map { type -> type to requestSparseArray.indexOfKey(type.namespace) } + .filter { (_, i) -> i >= 0 } + .forEach { (configType, requestIndex) -> + responseList.getOrNull(requestIndex)?.let { rawResponse -> + if (rawResponse["code"] as? Int != 200) { + Log.e(TAG, "Batch sub-request had non-200 response code, returned code ${(rawResponse["code"] as? Int) ?: "[unknown]"}") + return@forEach + } + val body = rawResponse["body"] as? RawResponse + if (body == null) { + Log.e(TAG, "Batch sub-request didn't contain a body") + return@forEach + } + + processConfig(snode, body, configType) } - val body = rawResponse["body"] as? RawResponse - if (body == null) { - Log.e(TAG, "Batch sub-request didn't contain a body") - return@forEach - } - - processConfig(snode, body, namespace, configClass) } - } // the first response will be the personal messages (we want these to be processed after config messages) val personalResponseIndex = requestSparseArray.indexOfKey(Namespace.DEFAULT()) diff --git a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/PollerException.kt b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/PollerException.kt deleted file mode 100644 index 327965463d..0000000000 --- a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/PollerException.kt +++ /dev/null @@ -1,15 +0,0 @@ -package org.session.libsession.messaging.sending_receiving.pollers - -/** - * Exception thrown by a Poller-family when multiple error could have occurred. - */ -class PollerException(message: String, errors: List) : RuntimeException( - message, - errors.firstOrNull() -) { - init { - errors.asSequence() - .drop(1) - .forEach(this::addSuppressed) - } -} diff --git a/libsession/src/main/java/org/session/libsession/messaging/utilities/UpdateMessageBuilder.kt b/libsession/src/main/java/org/session/libsession/messaging/utilities/UpdateMessageBuilder.kt index 75c5eb04c7..ce3db51acb 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/utilities/UpdateMessageBuilder.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/utilities/UpdateMessageBuilder.kt @@ -192,7 +192,7 @@ object UpdateMessageBuilder { .put(OTHER_NAME_KEY, context.youOrSender(updateData.sessionIds.first { it != userPublicKey })) .format() number == 2 -> Phrase.from(context, - R.string.groupMemberNewMultiple) + R.string.groupMemberNewTwo) .put(NAME_KEY, context.youOrSender(updateData.sessionIds.first())) .put(OTHER_NAME_KEY, context.youOrSender(updateData.sessionIds.last())) .format() diff --git a/libsession/src/main/java/org/session/libsession/snode/SnodeAPI.kt b/libsession/src/main/java/org/session/libsession/snode/SnodeAPI.kt index 1cc83c66b3..38853faa6c 100644 --- a/libsession/src/main/java/org/session/libsession/snode/SnodeAPI.kt +++ b/libsession/src/main/java/org/session/libsession/snode/SnodeAPI.kt @@ -510,8 +510,8 @@ object SnodeAPI { } fun buildAuthenticatedRetrieveBatchRequest( - snode: Snode, auth: SwarmAuth, + lastHash: String?, namespace: Int = 0, maxSize: Int? = null ): SnodeBatchRequestInfo { @@ -520,7 +520,7 @@ object SnodeAPI { auth = auth, verificationData = { ns, t -> "${Snode.Method.Retrieve.rawValue}$ns$t" }, ) { - put("last_hash", database.getLastMessageHashValue(snode, auth.accountId.hexString, namespace).orEmpty()) + put("last_hash", lastHash.orEmpty()) if (maxSize != null) { put("max_size", maxSize) } @@ -639,13 +639,15 @@ object SnodeAPI { return@batch } + // For each response, parse the result, match it with the request then send + // back through the request's callback. for ((req, resp) in batch.zip(responses.results)) { - val result = if (resp.code != 200) { - Result.failure(RuntimeException("Error with code = ${resp.code}, msg = ${resp.body}")) - } else { - runCatching { - JsonUtil.fromJson(resp.body, req.responseType) + val result = runCatching { + check(resp.code == 200) { + "Error with code = ${resp.code}, msg = ${resp.body}" } + + JsonUtil.fromJson(resp.body, req.responseType) } req.callback.send(result) diff --git a/libsession/src/main/java/org/session/libsession/snode/model/MessageResponses.kt b/libsession/src/main/java/org/session/libsession/snode/model/MessageResponses.kt new file mode 100644 index 0000000000..0e3cd270cf --- /dev/null +++ b/libsession/src/main/java/org/session/libsession/snode/model/MessageResponses.kt @@ -0,0 +1,20 @@ +package org.session.libsession.snode.model + +import com.fasterxml.jackson.annotation.JsonCreator +import com.fasterxml.jackson.annotation.JsonProperty + +data class StoreMessageResponse @JsonCreator constructor( + @JsonProperty("hash") val hash: String, + @JsonProperty("t") val timestamp: Long, +) + +class RetrieveMessageResponse @JsonCreator constructor( + @JsonProperty("messages") val messages: List, +) { + class Message @JsonCreator constructor( + @JsonProperty("hash") val hash: String, + @JsonProperty("t") val timestamp: Long, + // Jackson is able to deserialize byte arrays from base64 strings + @JsonProperty("data") val data: ByteArray, + ) +} \ No newline at end of file diff --git a/libsession/src/main/java/org/session/libsession/snode/model/StoreMessageResponse.kt b/libsession/src/main/java/org/session/libsession/snode/model/StoreMessageResponse.kt deleted file mode 100644 index 6cd05fbbc8..0000000000 --- a/libsession/src/main/java/org/session/libsession/snode/model/StoreMessageResponse.kt +++ /dev/null @@ -1,9 +0,0 @@ -package org.session.libsession.snode.model - -import com.fasterxml.jackson.annotation.JsonCreator -import com.fasterxml.jackson.annotation.JsonProperty - -data class StoreMessageResponse @JsonCreator constructor( - @JsonProperty("hash") val hash: String, - @JsonProperty("t") val timestamp: Long, -) diff --git a/libsession/src/main/java/org/session/libsession/utilities/ConfigFactoryProtocol.kt b/libsession/src/main/java/org/session/libsession/utilities/ConfigFactoryProtocol.kt index 1ee3b58098..9b24484408 100644 --- a/libsession/src/main/java/org/session/libsession/utilities/ConfigFactoryProtocol.kt +++ b/libsession/src/main/java/org/session/libsession/utilities/ConfigFactoryProtocol.kt @@ -17,14 +17,18 @@ import network.loki.messenger.libsession_util.ReadableGroupKeysConfig import network.loki.messenger.libsession_util.ReadableGroupMembersConfig import network.loki.messenger.libsession_util.ReadableUserGroupsConfig import network.loki.messenger.libsession_util.ReadableUserProfile +import network.loki.messenger.libsession_util.util.ConfigPush +import network.loki.messenger.libsession_util.util.GroupInfo import org.session.libsession.snode.SwarmAuth import org.session.libsignal.utilities.AccountId +import org.session.libsignal.utilities.Namespace interface ConfigFactoryProtocol { val configUpdateNotifications: Flow fun withUserConfigs(cb: (UserConfigs) -> T): T fun withMutableUserConfigs(cb: (MutableUserConfigs) -> T): T + fun mergeUserConfigs(userConfigType: UserConfigType, messages: List) fun withGroupConfigs(groupId: AccountId, cb: (GroupConfigs) -> T): T fun withMutableGroupConfigs(groupId: AccountId, cb: (MutableGroupConfigs) -> T): T @@ -39,8 +43,52 @@ interface ConfigFactoryProtocol { domain: String, closedGroupSessionId: AccountId): ByteArray? + fun mergeGroupConfigMessages( + groupId: AccountId, + keys: List, + info: List, + members: List + ) + + fun confirmUserConfigsPushed( + contacts: Pair? = null, + userProfile: Pair? = null, + convoInfoVolatile: Pair? = null, + userGroups: Pair? = null + ) + + fun confirmGroupConfigsPushed( + groupId: AccountId, + members: Pair?, + info: Pair?, + keysPush: ConfigPushResult? + ) } +class ConfigMessage( + val hash: String, + val data: ByteArray, + val timestamp: Long +) + +data class ConfigPushResult( + val hash: String, + val timestamp: Long +) + +enum class UserConfigType(val namespace: Int) { + CONTACTS(Namespace.CONTACTS()), + USER_PROFILE(Namespace.USER_PROFILE()), + CONVO_INFO_VOLATILE(Namespace.CONVO_INFO_VOLATILE()), + USER_GROUPS(Namespace.GROUPS()), +} + +/** + * Shortcut to get the group info for a closed group. Equivalent to: `withUserConfigs { it.userGroups.getClosedGroup(groupId) }` + */ +fun ConfigFactoryProtocol.getClosedGroup(groupId: AccountId): GroupInfo.ClosedGroupInfo? { + return withUserConfigs { it.userGroups.getClosedGroup(groupId.hexString) } +} interface UserConfigs { val contacts: ReadableContacts @@ -48,7 +96,14 @@ interface UserConfigs { val userProfile: ReadableUserProfile val convoInfoVolatile: ReadableConversationVolatileConfig - fun allConfigs(): Sequence = sequenceOf(contacts, userGroups, userProfile, convoInfoVolatile) + fun getConfig(type: UserConfigType): ReadableConfig { + return when (type) { + UserConfigType.CONTACTS -> contacts + UserConfigType.USER_PROFILE -> userProfile + UserConfigType.CONVO_INFO_VOLATILE -> convoInfoVolatile + UserConfigType.USER_GROUPS -> userGroups + } + } } interface MutableUserConfigs : UserConfigs { @@ -57,7 +112,14 @@ interface MutableUserConfigs : UserConfigs { override val userProfile: MutableUserProfile override val convoInfoVolatile: MutableConversationVolatileConfig - override fun allConfigs(): Sequence = sequenceOf(contacts, userGroups, userProfile, convoInfoVolatile) + override fun getConfig(type: UserConfigType): MutableConfig { + return when (type) { + UserConfigType.CONTACTS -> contacts + UserConfigType.USER_PROFILE -> userProfile + UserConfigType.CONVO_INFO_VOLATILE -> convoInfoVolatile + UserConfigType.USER_GROUPS -> userGroups + } + } } interface GroupConfigs { @@ -71,8 +133,7 @@ interface MutableGroupConfigs : GroupConfigs { override val groupMembers: MutableGroupMembersConfig override val groupKeys: MutableGroupKeysConfig - fun loadKeys(message: ByteArray, hash: String, timestamp: Long): Boolean - fun rekeys() + fun rekey() } sealed interface ConfigUpdateNotification { @@ -80,30 +141,3 @@ sealed interface ConfigUpdateNotification { data class GroupConfigsUpdated(val groupId: AccountId) : ConfigUpdateNotification data class GroupConfigsDeleted(val groupId: AccountId) : ConfigUpdateNotification } - -//interface ConfigFactoryUpdateListener { -// fun notifyUpdates(forConfigObject: Config, messageTimestamp: Long) -//} - - - -///** -// * Access group configs if they exist, otherwise return null. -// * -// * Note: The config objects will be closed after the callback is executed. Any attempt -// * to store the config objects will result in a native crash. -// */ -//inline fun ConfigFactoryProtocol.withGroupConfigsOrNull( -// groupId: AccountId, -// cb: (GroupInfoConfig, GroupMembersConfig, GroupKeysConfig) -> T -//): T? { -// getGroupInfoConfig(groupId)?.use { groupInfo -> -// getGroupMemberConfig(groupId)?.use { groupMembers -> -// getGroupKeysConfig(groupId, groupInfo, groupMembers)?.use { groupKeys -> -// return cb(groupInfo, groupMembers, groupKeys) -// } -// } -// } -// -// return null -//} \ No newline at end of file diff --git a/libsession/src/main/java/org/session/libsession/utilities/TextSecurePreferences.kt b/libsession/src/main/java/org/session/libsession/utilities/TextSecurePreferences.kt index faff912ed1..0328575bda 100644 --- a/libsession/src/main/java/org/session/libsession/utilities/TextSecurePreferences.kt +++ b/libsession/src/main/java/org/session/libsession/utilities/TextSecurePreferences.kt @@ -41,8 +41,6 @@ import javax.inject.Singleton interface TextSecurePreferences { - fun getLastConfigurationSyncTime(): Long - fun setLastConfigurationSyncTime(value: Long) fun getConfigurationMessageSynced(): Boolean fun setConfigurationMessageSynced(value: Boolean) @@ -108,6 +106,7 @@ interface TextSecurePreferences { fun setUpdateApkDigest(value: String?) fun getUpdateApkDigest(): String? fun getLocalNumber(): String? + fun watchLocalNumber(): StateFlow fun getHasLegacyConfig(): Boolean fun setHasLegacyConfig(newValue: Boolean) fun setLocalNumber(localNumber: String) @@ -203,6 +202,10 @@ interface TextSecurePreferences { @JvmStatic var pushSuffix = "" + + // This is a stop-gap solution for static access to shared preference. + internal lateinit var preferenceInstance: TextSecurePreferences + const val DISABLE_PASSPHRASE_PREF = "pref_disable_passphrase" const val LANGUAGE_PREF = "pref_language" const val THREAD_TRIM_NOW = "pref_trim_now" @@ -219,7 +222,7 @@ interface TextSecurePreferences { const val SCREEN_SECURITY_PREF = "pref_screen_security" const val ENTER_SENDS_PREF = "pref_enter_sends" const val THREAD_TRIM_ENABLED = "pref_trim_threads" - const val LOCAL_NUMBER_PREF = "pref_local_number" + internal const val LOCAL_NUMBER_PREF = "pref_local_number" const val REGISTERED_GCM_PREF = "pref_gcm_registered" const val UPDATE_APK_REFRESH_TIME_PREF = "pref_update_apk_refresh_time" const val UPDATE_APK_DOWNLOAD_ID = "pref_update_apk_download_id" @@ -265,7 +268,6 @@ interface TextSecurePreferences { const val GIF_METADATA_WARNING = "has_seen_gif_metadata_warning" const val GIF_GRID_LAYOUT = "pref_gif_grid_layout" val IS_PUSH_ENABLED get() = "pref_is_using_fcm$pushSuffix" - const val LAST_CONFIGURATION_SYNC_TIME = "pref_last_configuration_sync_time" const val CONFIGURATION_SYNCED = "pref_configuration_synced" const val LAST_PROFILE_UPDATE_TIME = "pref_last_profile_update_time" const val LAST_OPEN_DATE = "pref_last_open_date" @@ -308,16 +310,16 @@ interface TextSecurePreferences { // for the lifetime of the Session installation. const val HAVE_WARNED_USER_ABOUT_SAVING_ATTACHMENTS = "libsession.HAVE_WARNED_USER_ABOUT_SAVING_ATTACHMENTS" - @JvmStatic - fun getLastConfigurationSyncTime(context: Context): Long { - return getLongPreference(context, LAST_CONFIGURATION_SYNC_TIME, 0) - } - - @JvmStatic - fun setLastConfigurationSyncTime(context: Context, value: Long) { - setLongPreference(context, LAST_CONFIGURATION_SYNC_TIME, value) - } - +// @JvmStatic +// fun getLastConfigurationSyncTime(context: Context): Long { +// return getLongPreference(context, LAST_CONFIGURATION_SYNC_TIME, 0) +// } +// +// @JvmStatic +// fun setLastConfigurationSyncTime(context: Context, value: Long) { +// setLongPreference(context, LAST_CONFIGURATION_SYNC_TIME, value) +// } +// @JvmStatic fun getConfigurationMessageSynced(context: Context): Boolean { return getBooleanPreference(context, CONFIGURATION_SYNCED, false) @@ -629,9 +631,13 @@ interface TextSecurePreferences { return getStringPreference(context, UPDATE_APK_DIGEST, null) } + @Deprecated( + "Use the dependency-injected TextSecurePreference instance instead", + ReplaceWith("TextSecurePreferences.getLocalNumber()") + ) @JvmStatic fun getLocalNumber(context: Context): String? { - return getStringPreference(context, LOCAL_NUMBER_PREF, null) + return preferenceInstance.getLocalNumber() } @JvmStatic @@ -645,13 +651,6 @@ interface TextSecurePreferences { _events.tryEmit(HAS_RECEIVED_LEGACY_CONFIG) } - fun setLocalNumber(context: Context, localNumber: String) { - setStringPreference(context, LOCAL_NUMBER_PREF, localNumber.toLowerCase()) - } - - fun removeLocalNumber(context: Context) { - removePreference(context, LOCAL_NUMBER_PREF) - } @JvmStatic fun isEnterSendsEnabled(context: Context): Boolean { @@ -994,14 +993,12 @@ interface TextSecurePreferences { class AppTextSecurePreferences @Inject constructor( @ApplicationContext private val context: Context ): TextSecurePreferences { - - override fun getLastConfigurationSyncTime(): Long { - return getLongPreference(TextSecurePreferences.LAST_CONFIGURATION_SYNC_TIME, 0) + init { + // Should remove once all static access to the companion objects is removed + TextSecurePreferences.preferenceInstance = this } - override fun setLastConfigurationSyncTime(value: Long) { - setLongPreference(TextSecurePreferences.LAST_CONFIGURATION_SYNC_TIME, value) - } + private val localNumberState = MutableStateFlow(getStringPreference(TextSecurePreferences.LOCAL_NUMBER_PREF, null)) override fun getConfigurationMessageSynced(): Boolean { return getBooleanPreference(TextSecurePreferences.CONFIGURATION_SYNCED, false) @@ -1262,7 +1259,11 @@ class AppTextSecurePreferences @Inject constructor( } override fun getLocalNumber(): String? { - return getStringPreference(TextSecurePreferences.LOCAL_NUMBER_PREF, null) + return localNumberState.value + } + + override fun watchLocalNumber(): StateFlow { + return localNumberState } override fun getHasLegacyConfig(): Boolean { @@ -1275,10 +1276,13 @@ class AppTextSecurePreferences @Inject constructor( } override fun setLocalNumber(localNumber: String) { - setStringPreference(TextSecurePreferences.LOCAL_NUMBER_PREF, localNumber.toLowerCase()) + val normalised = localNumber.lowercase() + setStringPreference(TextSecurePreferences.LOCAL_NUMBER_PREF, normalised) + localNumberState.value = normalised } override fun removeLocalNumber() { + localNumberState.value = null removePreference(TextSecurePreferences.LOCAL_NUMBER_PREF) }