diff --git a/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java b/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java index dcb2e8b08f..1cf7b757bc 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java +++ b/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java @@ -48,6 +48,7 @@ import org.session.libsession.messaging.notifications.TokenFetcher; import org.session.libsession.messaging.sending_receiving.notifications.MessageNotifier; import org.session.libsession.messaging.sending_receiving.pollers.LegacyClosedGroupPollerV2; import org.session.libsession.messaging.sending_receiving.pollers.Poller; +import org.session.libsession.snode.SnodeClock; import org.session.libsession.snode.SnodeModule; import org.session.libsession.utilities.Address; import org.session.libsession.utilities.Device; @@ -165,6 +166,7 @@ public class ApplicationContext extends Application implements DefaultLifecycleO MessagingModuleConfiguration messagingModuleConfiguration; @Inject ConfigSyncHandler configSyncHandler; @Inject RemoveGroupMemberHandler removeGroupMemberHandler; + @Inject SnodeClock snodeClock; private volatile boolean isAppVisible; @@ -236,7 +238,8 @@ public class ApplicationContext extends Application implements DefaultLifecycleO lastSentTimestampCache, this, tokenFetcher, - groupManagerV2 + groupManagerV2, + snodeClock ); callMessageProcessor = new CallMessageProcessor(this, textSecurePreferences, ProcessLifecycleOwner.get().getLifecycle(), storage); Log.i(TAG, "onCreate()"); @@ -270,6 +273,7 @@ public class ApplicationContext extends Application implements DefaultLifecycleO pushRegistrationHandler.run(); configSyncHandler.start(); removeGroupMemberHandler.start(); + snodeClock.start(); // add our shortcut debug menu if we are not in a release build if (BuildConfig.BUILD_TYPE != "release") { diff --git a/app/src/main/java/org/thoughtcrime/securesms/dependencies/ConfigFactory.kt b/app/src/main/java/org/thoughtcrime/securesms/dependencies/ConfigFactory.kt index 58ef0cd562..b17fde5c33 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/dependencies/ConfigFactory.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/dependencies/ConfigFactory.kt @@ -327,8 +327,6 @@ class ConfigFactory @Inject constructor( cb(configs as GroupConfigsImpl) } - Log.d("ConfigFactory", "Group updated? $groupId: $changed") - if (changed) { if (!_configUpdateNotifications.tryEmit(ConfigUpdateNotification.GroupConfigsUpdated(groupId))) { Log.e("ConfigFactory", "Unable to deliver group update notification") @@ -351,6 +349,7 @@ class ConfigFactory @Inject constructor( override fun removeGroup(groupId: AccountId) { withMutableUserConfigs { it.userGroups.eraseClosedGroup(groupId.hexString) + it.convoInfoVolatile.eraseClosedGroup(groupId.hexString) } if (groupConfigs.remove(groupId) != null) { diff --git a/app/src/main/java/org/thoughtcrime/securesms/dependencies/SessionUtilModule.kt b/app/src/main/java/org/thoughtcrime/securesms/dependencies/SessionUtilModule.kt index ea006b7196..c6c513fad5 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/dependencies/SessionUtilModule.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/dependencies/SessionUtilModule.kt @@ -14,6 +14,7 @@ import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.GlobalScope import org.session.libsession.database.StorageProtocol import org.session.libsession.messaging.groups.GroupManagerV2 +import org.session.libsession.snode.SnodeClock import org.session.libsession.utilities.ConfigFactoryProtocol import org.session.libsignal.database.LokiAPIDatabaseProtocol import org.thoughtcrime.securesms.database.ConfigDatabase @@ -52,4 +53,8 @@ object SessionUtilModule { storage = storage, lokiApiDatabase = lokiApiDatabase, ) + + @Provides + @Singleton + fun provideSnodeClock() = SnodeClock() } \ No newline at end of file diff --git a/app/src/main/java/org/thoughtcrime/securesms/groups/EditGroupViewModel.kt b/app/src/main/java/org/thoughtcrime/securesms/groups/EditGroupViewModel.kt index 49acc4bad5..7218901b71 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/groups/EditGroupViewModel.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/groups/EditGroupViewModel.kt @@ -8,6 +8,7 @@ import dagger.assisted.AssistedInject import dagger.hilt.android.lifecycle.HiltViewModel import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.async import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.SharingStarted @@ -242,14 +243,13 @@ class EditGroupViewModel @AssistedInject constructor( mutableInProgress.value = true // We need to use GlobalScope here because we don't want - // "removeMember" to be cancelled when the view model is cleared. This operation - // is expected to complete even if the view model is cleared. - val task = GlobalScope.launch { + // any group operation to be cancelled when the view model is cleared. + val task = GlobalScope.async { operation() } try { - task.join() + task.await() } catch (e: Exception) { mutableError.value = e.localizedMessage.orEmpty() } finally { diff --git a/app/src/main/java/org/thoughtcrime/securesms/groups/GroupManagerV2Impl.kt b/app/src/main/java/org/thoughtcrime/securesms/groups/GroupManagerV2Impl.kt index 8bd89f2c53..5b8ebab026 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/groups/GroupManagerV2Impl.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/groups/GroupManagerV2Impl.kt @@ -3,7 +3,10 @@ package org.thoughtcrime.securesms.groups import android.content.Context import com.google.protobuf.ByteString import dagger.hilt.android.qualifiers.ApplicationContext +import kotlinx.coroutines.Deferred import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.Job +import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.async import kotlinx.coroutines.withContext import network.loki.messenger.libsession_util.ConfigBase.Companion.PRIORITY_VISIBLE @@ -12,7 +15,6 @@ import network.loki.messenger.libsession_util.util.GroupInfo import network.loki.messenger.libsession_util.util.GroupMember import network.loki.messenger.libsession_util.util.INVITE_STATUS_FAILED import network.loki.messenger.libsession_util.util.INVITE_STATUS_SENT -import network.loki.messenger.libsession_util.util.Sodium import network.loki.messenger.libsession_util.util.UserPic import org.session.libsession.database.StorageProtocol import org.session.libsession.database.userAuth @@ -38,6 +40,7 @@ import org.session.libsession.utilities.Address import org.session.libsession.utilities.SSKEnvironment import org.session.libsession.utilities.getClosedGroup import org.session.libsession.utilities.recipients.Recipient +import org.session.libsession.utilities.waitUntilGroupConfigsPushed import org.session.libsignal.messages.SignalServiceGroup import org.session.libsignal.protos.SignalServiceProtos.DataMessage import org.session.libsignal.protos.SignalServiceProtos.DataMessage.GroupUpdateDeleteMemberContentMessage @@ -76,10 +79,11 @@ class GroupManagerV2Impl @Inject constructor( * @throws IllegalArgumentException if the group does not exist or no admin key is found. */ private fun requireAdminAccess(group: AccountId): ByteArray { - return checkNotNull(configFactory - .withUserConfigs { it.userGroups.getClosedGroup(group.hexString) } - ?.adminKey - ?.takeIf { it.isNotEmpty() }) { "Only admin is allowed to invite members" } + return checkNotNull( + configFactory.getClosedGroup(group) + ?.adminKey + ?.takeIf { it.isNotEmpty() } + ) { "Only admin is allowed to invite members" } } override suspend fun createGroup( @@ -95,7 +99,9 @@ class GroupManagerV2Impl @Inject constructor( // Create a group in the user groups config val group = configFactory.withMutableUserConfigs { configs -> - configs.userGroups.createGroup().also(configs.userGroups::set) + configs.userGroups.createGroup() + .copy(name = groupName) + .also(configs.userGroups::set) } checkNotNull(group.adminKey) { "Admin key is null for new group creation." } @@ -133,6 +139,10 @@ class GroupManagerV2Impl @Inject constructor( configs.rekey() } + if (!configFactory.waitUntilGroupConfigsPushed(groupId)) { + Log.w(TAG, "Unable to push group configs in a timely manner") + } + configFactory.withMutableUserConfigs { it.convoInfoVolatile.set( Conversation.ClosedGroup( @@ -281,77 +291,105 @@ class GroupManagerV2Impl @Inject constructor( removedMembers: List, removeMessages: Boolean ) { - doRemoveMembers( + flagMembersForRemoval( group = groupAccountId, - removedMembers = removedMembers, - sendRemovedMessage = true, - removeMemberMessages = removeMessages + members = removedMembers, + alsoRemoveMembersMessage = removeMessages, + sendMemberChangeMessage = true ) } - override suspend fun handleMemberLeft(message: GroupUpdated, closedGroupId: AccountId) { - val closedGroupHexString = closedGroupId.hexString - val closedGroup = - configFactory.withUserConfigs { it.userGroups.getClosedGroup(closedGroupId.hexString) } - ?: return + override suspend fun removeMemberMessages( + groupAccountId: AccountId, + members: List + ): Unit = withContext(dispatcher) { + val messagesToDelete = mutableListOf() + + val threadId = storage.getThreadId(Address.fromSerialized(groupAccountId.hexString)) + if (threadId != null) { + for (member in members) { + for (msg in mmsSmsDatabase.getUserMessages(threadId, member.hexString)) { + val serverHash = lokiDatabase.getMessageServerHash(msg.id, msg.isMms) + if (serverHash != null) { + messagesToDelete.add(serverHash) + } + } + + storage.deleteMessagesByUser(threadId, member.hexString) + } + } + + if (messagesToDelete.isEmpty()) { + return@withContext + } + + val groupAdminAuth = configFactory.getClosedGroup(groupAccountId)?.adminKey?.let { + OwnedSwarmAuth.ofClosedGroup(groupAccountId, it) + } ?: return@withContext + + SnodeAPI.deleteMessage(groupAccountId.hexString, groupAdminAuth, messagesToDelete).await() + } + + override suspend fun handleMemberLeft(message: GroupUpdated, group: AccountId) { + val closedGroup = configFactory.getClosedGroup(group) ?: return + if (closedGroup.hasAdminKey()) { - // re-key and do a new config removing the previous member - doRemoveMembers( - closedGroupId, - listOf(AccountId(message.sender!!)), - sendRemovedMessage = false, - removeMemberMessages = false + flagMembersForRemoval( + group = group, + members = listOf(AccountId(message.sender!!)), + alsoRemoveMembersMessage = false, + sendMemberChangeMessage = false ) } else { - val hasAnyAdminRemaining = configFactory.withGroupConfigs(closedGroupId) { configs -> + val hasAnyAdminRemaining = configFactory.withGroupConfigs(group) { configs -> configs.groupMembers.all() .asSequence() .filterNot { it.sessionId == message.sender } .any { it.admin && !it.removed } } - // if the leaving member is an admin, disable the group and remove it - // This is just to emulate the "existing" group behaviour, this will need to be removed in future + // if the leaving member is last admin, disable the group and remove it + // This is just to emulate the "existing" group behaviour, this will probably be removed in future if (!hasAnyAdminRemaining) { - pollerFactory.pollerFor(closedGroupId)?.stop() - storage.getThreadId(Address.fromSerialized(closedGroupHexString)) + pollerFactory.pollerFor(group)?.stop() + storage.getThreadId(Address.fromSerialized(group.hexString)) ?.let(storage::deleteConversation) - configFactory.removeGroup(closedGroupId) + configFactory.removeGroup(group) } } } override suspend fun leaveGroup(group: AccountId, deleteOnLeave: Boolean) { - val canSendGroupMessage = - configFactory.withUserConfigs { it.userGroups.getClosedGroup(group.hexString) }?.kicked != true - val address = Address.fromSerialized(group.hexString) + val canSendGroupMessage = configFactory.getClosedGroup(group)?.kicked == false if (canSendGroupMessage) { - MessageSender.sendNonDurably( - message = GroupUpdated( + val destination = Destination.ClosedGroup(group.hexString) + + MessageSender.send( + GroupUpdated( GroupUpdateMessage.newBuilder() .setMemberLeftMessage(DataMessage.GroupUpdateMemberLeftMessage.getDefaultInstance()) .build() ), - address = address, + destination, isSyncMessage = false ).await() - MessageSender.sendNonDurably( - message = GroupUpdated( + MessageSender.send( + GroupUpdated( GroupUpdateMessage.newBuilder() .setMemberLeftNotificationMessage(DataMessage.GroupUpdateMemberLeftNotificationMessage.getDefaultInstance()) .build() ), - address = address, + destination, isSyncMessage = false ).await() } pollerFactory.pollerFor(group)?.stop() - // TODO: set "deleted" and post to -10 group namespace? if (deleteOnLeave) { - storage.getThreadId(address)?.let(storage::deleteConversation) + storage.getThreadId(Address.fromSerialized(group.hexString)) + ?.let(storage::deleteConversation) configFactory.removeGroup(group) } } @@ -359,25 +397,25 @@ class GroupManagerV2Impl @Inject constructor( override suspend fun promoteMember( group: AccountId, members: List - ): Unit = withContext(dispatcher) { + ): Unit = withContext(dispatcher + SupervisorJob()) { val adminKey = requireAdminAccess(group) val groupName = configFactory.withGroupConfigs(group) { it.groupInfo.getName() } // Send out the promote message to the members concurrently + val promoteMessage = GroupUpdated( + GroupUpdateMessage.newBuilder() + .setPromoteMessage( + DataMessage.GroupUpdatePromoteMessage.newBuilder() + .setGroupIdentitySeed(ByteString.copyFrom(adminKey)) + .setName(groupName) + ) + .build() + ) + val promotionDeferred = members.associateWith { member -> async { - val message = GroupUpdated( - GroupUpdateMessage.newBuilder() - .setPromoteMessage( - DataMessage.GroupUpdatePromoteMessage.newBuilder() - .setGroupIdentitySeed(ByteString.copyFrom(adminKey)) - .setName(groupName) - ) - .build() - ) - MessageSender.sendNonDurably( - message = message, + message = promoteMessage, address = Address.fromSerialized(member.hexString), isSyncMessage = false ).await() @@ -428,125 +466,25 @@ class GroupManagerV2Impl @Inject constructor( storage.insertGroupInfoChange(message, group) } - private suspend fun doRemoveMembers( - group: AccountId, - removedMembers: List, - sendRemovedMessage: Boolean, - removeMemberMessages: Boolean - ) = withContext(dispatcher) { + private suspend fun flagMembersForRemoval( + group: AccountId, members: List, + alsoRemoveMembersMessage: Boolean, + sendMemberChangeMessage: Boolean + ) { val adminKey = requireAdminAccess(group) - val groupAuth = OwnedSwarmAuth.ofClosedGroup(group, adminKey) - // To remove a member from a group, we need to first: - // 1. Notify the swarm that this member's key has bene revoked - // 2. Send a "kicked" message to a special namespace that the kicked member can still read - // 3. Optionally, send "delete member messages" to the group. (So that every device in the group - // delete this member's messages locally.) - // These three steps will be included in a sequential call as they all need to be done in order. - // After these steps are all done, we will do the following: - // Update the group configs to remove the member, sync if needed, then - // delete the member's messages locally and remotely. - - val essentialRequests = configFactory.withGroupConfigs(group) { configs -> - val messageSendTimestamp = SnodeAPI.nowWithOffset - - buildList { - this += SnodeAPI.buildAuthenticatedRevokeSubKeyBatchRequest( - groupAdminAuth = groupAuth, - subAccountTokens = removedMembers.map(configs.groupKeys::getSubAccountToken) - ) - - this += Sodium.encryptForMultipleSimple( - messages = removedMembers.map { "${it.hexString}-${configs.groupKeys.currentGeneration()}".encodeToByteArray() } - .toTypedArray(), - recipients = removedMembers.map { it.pubKeyBytes }.toTypedArray(), - ed25519SecretKey = adminKey, - domain = Sodium.KICKED_DOMAIN - ).let { encryptedForMembers -> - SnodeAPI.buildAuthenticatedStoreBatchInfo( - namespace = Namespace.REVOKED_GROUP_MESSAGES(), - message = SnodeMessage( - recipient = group.hexString, - data = Base64.encodeBytes(encryptedForMembers), - ttl = SnodeMessage.CONFIG_TTL, - timestamp = messageSendTimestamp - ), - auth = groupAuth - ) - } - - if (removeMemberMessages) { - val adminSignature = - SodiumUtilities.sign( - buildDeleteMemberContentSignature( - memberIds = removedMembers, - messageHashes = emptyList(), - timestamp = messageSendTimestamp - ), adminKey - ) - - this += SnodeAPI.buildAuthenticatedStoreBatchInfo( - namespace = Namespace.CLOSED_GROUP_MESSAGES(), - message = MessageSender.buildWrappedMessageToSnode( - destination = Destination.ClosedGroup(group.hexString), - message = GroupUpdated( - GroupUpdateMessage.newBuilder() - .setDeleteMemberContent( - GroupUpdateDeleteMemberContentMessage.newBuilder() - .addAllMemberSessionIds(removedMembers.map { it.hexString }) - .setAdminSignature(ByteString.copyFrom(adminSignature)) - ) - .build() - ).apply { sentTimestamp = messageSendTimestamp }, - isSyncMessage = false - ), - auth = groupAuth - ) - } - } - } - - val snode = SnodeAPI.getSingleTargetSnode(group.hexString).await() - val responses = SnodeAPI.getBatchResponse( - snode, - group.hexString, - essentialRequests, - sequence = true - ) - - responses.requireAllRequestsSuccessful("Failed to execute essential steps for removing member") - - // Next step: update group configs, rekey, remove member messages if required + // 1. Mark the members as removed in the group configs configFactory.withMutableGroupConfigs(group) { configs -> - removedMembers.forEach { configs.groupMembers.erase(it.hexString) } - configs.rekey() - } - - if (removeMemberMessages) { - val threadId = storage.getThreadId(Address.fromSerialized(group.hexString)) - if (threadId != null) { - val messagesToDelete = mutableListOf() - for (member in removedMembers) { - for (msg in mmsSmsDatabase.getUserMessages(threadId, member.hexString)) { - val serverHash = lokiDatabase.getMessageServerHash(msg.id, msg.isMms) - if (serverHash != null) { - messagesToDelete.add(serverHash) - } - } - - storage.deleteMessagesByUser(threadId, member.hexString) + for (member in members) { + val memberConfig = configs.groupMembers.get(member.hexString) + if (memberConfig != null) { + configs.groupMembers.set(memberConfig.setRemoved(alsoRemoveMembersMessage)) } - - SnodeAPI.sendBatchRequest( - snode, group.hexString, SnodeAPI.buildAuthenticatedDeleteBatchInfo( - groupAuth, - messagesToDelete - ) - ) } } - if (sendRemovedMessage) { + // 2. Send a member change message + if (sendMemberChangeMessage) { val timestamp = SnodeAPI.nowWithOffset val signature = SodiumUtilities.sign( buildMemberChangeSignature( @@ -559,7 +497,7 @@ class GroupManagerV2Impl @Inject constructor( val updateMessage = GroupUpdateMessage.newBuilder() .setMemberChangeMessage( GroupUpdateMemberChangeMessage.newBuilder() - .addAllMemberSessionIds(removedMembers.map { it.hexString }) + .addAllMemberSessionIds(members.map { it.hexString }) .setType(GroupUpdateMemberChangeMessage.Type.REMOVED) .setAdminSignature(ByteString.copyFrom(signature)) ) @@ -567,8 +505,8 @@ class GroupManagerV2Impl @Inject constructor( val message = GroupUpdated( updateMessage ).apply { sentTimestamp = timestamp } - MessageSender.send(message, Destination.ClosedGroup(group.hexString), false) - storage.insertGroupInfoChange(message, group) + + MessageSender.send(message, Destination.ClosedGroup(group.hexString), false).await() } } @@ -671,8 +609,7 @@ class GroupManagerV2Impl @Inject constructor( promoteMessageHash: String? ) = withContext(dispatcher) { val userAuth = requireNotNull(storage.userAuth) { "No current user available" } - val group = - configFactory.withUserConfigs { it.userGroups.getClosedGroup(groupId.hexString) } + val group = configFactory.getClosedGroup(groupId) if (group == null) { // If we haven't got the group in the config, it could mean that we haven't @@ -692,12 +629,13 @@ class GroupManagerV2Impl @Inject constructor( } // Update our promote state - configFactory.withMutableGroupConfigs(recreateConfigInstances = true, groupId = groupId) { configs -> + configFactory.withMutableGroupConfigs( + recreateConfigInstances = true, + groupId = groupId + ) { configs -> configs.groupMembers.get(userAuth.accountId.hexString)?.let { member -> configs.groupMembers.set(member.setPromoteSuccess()) } - - Unit } } @@ -729,7 +667,7 @@ class GroupManagerV2Impl @Inject constructor( inviter: AccountId, ) { // If we have already received an invitation in the past, we should not process this one - if (configFactory.withUserConfigs { it.userGroups.getClosedGroup(groupId.hexString) }?.invited == true) { + if (configFactory.getClosedGroup(groupId)?.invited == true) { return } @@ -893,8 +831,11 @@ class GroupManagerV2Impl @Inject constructor( // If we are admin, we can delete the messages from the group swarm group.adminKey?.let { adminKey -> - SnodeAPI.deleteMessage(groupId.hexString, OwnedSwarmAuth.ofClosedGroup(groupId, adminKey), messageHashes) - .await() + SnodeAPI.deleteMessage( + publicKey = groupId.hexString, + swarmAuth = OwnedSwarmAuth.ofClosedGroup(groupId, adminKey), + serverHashes = messageHashes + ).await() } // Construct a message to ask members to delete the messages, sign if we are admin, then send @@ -978,8 +919,11 @@ class GroupManagerV2Impl @Inject constructor( groupId.hexString ) ) { - SnodeAPI.deleteMessage(groupId.hexString, OwnedSwarmAuth.ofClosedGroup(groupId, adminKey), hashes) - .await() + SnodeAPI.deleteMessage( + groupId.hexString, + OwnedSwarmAuth.ofClosedGroup(groupId, adminKey), + hashes + ).await() } // The non-admin user shouldn't be able to delete other user's messages so we will diff --git a/libsession-util/src/main/java/network/loki/messenger/libsession_util/util/Sodium.kt b/libsession-util/src/main/java/network/loki/messenger/libsession_util/util/Sodium.kt index 7479177e39..f2a6cd02e2 100644 --- a/libsession-util/src/main/java/network/loki/messenger/libsession_util/util/Sodium.kt +++ b/libsession-util/src/main/java/network/loki/messenger/libsession_util/util/Sodium.kt @@ -1,9 +1,11 @@ package network.loki.messenger.libsession_util.util +import java.util.regex.Pattern + object Sodium { const val KICKED_DOMAIN = "SessionGroupKickedMessage" - val KICKED_REGEX = Regex("05\\w{64}-\\d+") + val KICKED_REGEX: Pattern = Pattern.compile("^(05[a-zA-Z0-9]{64})(\\d+)$") init { System.loadLibrary("session_util") diff --git a/libsession/src/main/java/org/session/libsession/messaging/MessagingModuleConfiguration.kt b/libsession/src/main/java/org/session/libsession/messaging/MessagingModuleConfiguration.kt index f56b74d8b0..ac25cab2fa 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/MessagingModuleConfiguration.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/MessagingModuleConfiguration.kt @@ -7,6 +7,7 @@ import org.session.libsession.database.StorageProtocol import org.session.libsession.messaging.groups.GroupManagerV2 import org.session.libsession.messaging.notifications.TokenFetcher import org.session.libsession.snode.OwnedSwarmAuth +import org.session.libsession.snode.SnodeClock import org.session.libsession.utilities.ConfigFactoryProtocol import org.session.libsession.utilities.Device import org.session.libsession.utilities.Toaster @@ -22,6 +23,7 @@ class MessagingModuleConfiguration( val toaster: Toaster, val tokenFetcher: TokenFetcher, val groupManagerV2: GroupManagerV2, + val clock: SnodeClock, ) { companion object { diff --git a/libsession/src/main/java/org/session/libsession/messaging/groups/GroupManagerV2.kt b/libsession/src/main/java/org/session/libsession/messaging/groups/GroupManagerV2.kt index 6d6ac37c93..c78dc6e762 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/groups/GroupManagerV2.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/groups/GroupManagerV2.kt @@ -29,7 +29,21 @@ interface GroupManagerV2 { removeMessages: Boolean ) - suspend fun handleMemberLeft(message: GroupUpdated, closedGroupId: AccountId) + /** + * Remove all messages from the group for the given members. + * + * This will delete all messages locally, and, if the user is an admin, remotely as well. + * + * Note: unlike [handleDeleteMemberContent], [requestMessageDeletion], this method + * does not try to validate the validity of the request, it also does not ask other members + * to delete the messages. It simply removes what it can. + */ + suspend fun removeMemberMessages( + groupAccountId: AccountId, + members: List + ) + + suspend fun handleMemberLeft(message: GroupUpdated, group: AccountId) suspend fun leaveGroup(group: AccountId, deleteOnLeave: Boolean) @@ -59,8 +73,22 @@ interface GroupManagerV2 { suspend fun setName(groupId: AccountId, newName: String) + /** + * Send a request to the group to delete the given messages. + * + * It can be called by a regular member who wishes to delete their own messages. + * It can also called by an admin, who can delete any messages from any member. + */ suspend fun requestMessageDeletion(groupId: AccountId, messageHashes: List) + /** + * Handle a request to delete a member's content from the group. This is called when we receive + * a message from the server that a member's content needs to be deleted. (usually sent by + * [requestMessageDeletion], for example) + * + * In contrast to [removeMemberMessages], where it will remove the messages blindly, this method + * will check if the right conditions are met before removing the messages. + */ suspend fun handleDeleteMemberContent( groupId: AccountId, deleteMemberContent: GroupUpdateDeleteMemberContentMessage, diff --git a/libsession/src/main/java/org/session/libsession/messaging/groups/RemoveGroupMemberHandler.kt b/libsession/src/main/java/org/session/libsession/messaging/groups/RemoveGroupMemberHandler.kt index 1a7fe3da16..4fe0bfe22c 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/groups/RemoveGroupMemberHandler.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/groups/RemoveGroupMemberHandler.kt @@ -2,10 +2,8 @@ package org.session.libsession.messaging.groups import android.os.SystemClock import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.Job -import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.async import kotlinx.coroutines.delay import kotlinx.coroutines.flow.first @@ -23,6 +21,7 @@ import org.session.libsession.snode.SnodeMessage import org.session.libsession.snode.utilities.await import org.session.libsession.utilities.ConfigFactoryProtocol import org.session.libsession.utilities.TextSecurePreferences +import org.session.libsession.utilities.waitUntilGroupConfigsPushed import org.session.libsignal.protos.SignalServiceProtos import org.session.libsignal.protos.SignalServiceProtos.DataMessage.GroupUpdateMessage import org.session.libsignal.utilities.AccountId @@ -35,17 +34,22 @@ private const val TAG = "RemoveGroupMemberHandler" private const val MIN_PROCESS_INTERVAL_MILLS = 1_000L +/** + * This handler is responsible for processing pending group member removals. + * + * It automatically does so by listening to the config updates changes and checking for any pending removals. + */ class RemoveGroupMemberHandler @Inject constructor( private val configFactory: ConfigFactoryProtocol, private val textSecurePreferences: TextSecurePreferences, + private val groupManager: GroupManagerV2, ) { - private val scope: CoroutineScope = GlobalScope private var job: Job? = null fun start() { require(job == null) { "Already started" } - job = scope.launch { + job = GlobalScope.launch { while (true) { // Make sure we have a local number before we start processing textSecurePreferences.watchLocalNumber().first { it != null } @@ -74,74 +78,54 @@ class RemoveGroupMemberHandler @Inject constructor( } private suspend fun processPendingMemberRemoval() { - // Run the removal process for each group in parallel - val removalTasks = configFactory.withUserConfigs { it.userGroups.allClosedGroupInfo() } + configFactory.withUserConfigs { it.userGroups.allClosedGroupInfo() } .asSequence() .filter { it.hasAdminKey() } - .associate { group -> - group.name to scope.async { - processPendingRemovalsForGroup( - groupAccountId = group.groupAccountId, - groupName = group.name, - adminKey = group.adminKey!! - ) - } + .forEach { group -> + processPendingRemovalsForGroup(group.groupAccountId, group.adminKey!!) } - - // Wait and collect the results of the removal tasks - for ((groupName, task) in removalTasks) { - try { - task.await() - } catch (e: Exception) { - Log.e(TAG, "Error processing pending removals for group $groupName", e) - } - } } private suspend fun processPendingRemovalsForGroup( groupAccountId: AccountId, - groupName: String, adminKey: ByteArray ) { - val swarmAuth = OwnedSwarmAuth( - accountId = groupAccountId, - ed25519PublicKeyHex = null, - ed25519PrivateKey = adminKey - ) + val groupAuth = OwnedSwarmAuth.ofClosedGroup(groupAccountId, adminKey) - val batchCalls = configFactory.withGroupConfigs(groupAccountId) { configs -> + val (pendingRemovals, batchCalls) = configFactory.withGroupConfigs(groupAccountId) { configs -> val pendingRemovals = configs.groupMembers.all().filter { it.removed } if (pendingRemovals.isEmpty()) { // Skip if there are no pending removals - return@withGroupConfigs emptyList() + return@withGroupConfigs pendingRemovals to emptyList() } - Log.d(TAG, "Processing ${pendingRemovals.size} pending removals for group $groupName") + Log.d(TAG, "Processing ${pendingRemovals.size} pending removals for group") // Perform a sequential call to group snode to: // 1. Revoke the member's sub key (by adding the key to a "revoked list" under the hood) - // 2. Send a message to a special namespace to inform the removed members they have been removed - // 3. Conditionally, delete removed-members' messages from the group's message store, if that option is selected by the actioning admin + // 2. Send a message to a special namespace on the group to inform the removed members they have been removed + // 3. Conditionally, send a `GroupUpdateDeleteMemberContent` to the group so the message deletion + // can be performed by everyone in the group. val calls = ArrayList(3) // Call No 1. Revoke sub-key. This call is crucial and must not fail for the rest of the operation to be successful. calls += checkNotNull( SnodeAPI.buildAuthenticatedRevokeSubKeyBatchRequest( - groupAdminAuth = swarmAuth, + groupAdminAuth = groupAuth, subAccountTokens = pendingRemovals.map { configs.groupKeys.getSubAccountToken(AccountId(it.sessionId)) } ) ) { "Fail to create a revoke request" } - // Call No 2. Send a message to the removed members + // Call No 2. Send a "kicked" message to the revoked namespace calls += SnodeAPI.buildAuthenticatedStoreBatchInfo( namespace = Namespace.REVOKED_GROUP_MESSAGES(), message = buildGroupKickMessage(groupAccountId.hexString, pendingRemovals, configs.groupKeys, adminKey), - auth = swarmAuth, + auth = groupAuth, ) - // Call No 3. Conditionally remove the message from the group's message store + // Call No 3. Conditionally send the `GroupUpdateDeleteMemberContent` if (pendingRemovals.any { it.shouldRemoveMessages }) { calls += SnodeAPI.buildAuthenticatedStoreBatchInfo( namespace = Namespace.CLOSED_GROUP_MESSAGES(), @@ -152,11 +136,11 @@ class RemoveGroupMemberHandler @Inject constructor( .filter { it.shouldRemoveMessages } .map { it.sessionId } ), - auth = swarmAuth, + auth = groupAuth, ) } - calls + pendingRemovals to (calls as List) } if (batchCalls.isEmpty()) { @@ -164,9 +148,39 @@ class RemoveGroupMemberHandler @Inject constructor( } val node = SnodeAPI.getSingleTargetSnode(groupAccountId.hexString).await() - SnodeAPI.getBatchResponse(node, groupAccountId.hexString, batchCalls, true) + val response = SnodeAPI.getBatchResponse(node, groupAccountId.hexString, batchCalls, sequence = true) - //TODO: Handle message removal + val firstError = response.results.firstOrNull { !it.isSuccessful } + check(firstError == null) { + "Error processing pending removals for group: code = ${firstError?.code}, body = ${firstError?.body}" + } + + Log.d(TAG, "Essential steps for group removal are done") + + // The essential part of the operation has been successful once we get to this point, + // now we can go ahead and update the configs + configFactory.withMutableGroupConfigs(groupAccountId) { configs -> + pendingRemovals.forEach(configs.groupMembers::erase) + configs.rekey() + } + + configFactory.waitUntilGroupConfigsPushed(groupAccountId) + + Log.d(TAG, "Group configs updated") + + // Try to delete members' message. It's ok to fail as they will be re-tried in different + // cases (a.k.a the GroupUpdateDeleteMemberContent message handling) and could be by different admins. + val deletingMessagesForMembers = pendingRemovals.filter { it.shouldRemoveMessages } + if (deletingMessagesForMembers.isNotEmpty()) { + try { + groupManager.removeMemberMessages( + groupAccountId, + deletingMessagesForMembers.map { AccountId(it.sessionId) } + ) + } catch (e: Exception) { + Log.e(TAG, "Error deleting messages for removed members", e) + } + } } private fun buildDeleteGroupMemberContentMessage( @@ -211,7 +225,7 @@ class RemoveGroupMemberHandler @Inject constructor( domain = Sodium.KICKED_DOMAIN ) ), - ttl = SnodeMessage.CONFIG_TTL, + ttl = SnodeMessage.DEFAULT_TTL, timestamp = SnodeAPI.nowWithOffset ) } diff --git a/libsession/src/main/java/org/session/libsession/messaging/messages/Message.kt b/libsession/src/main/java/org/session/libsession/messaging/messages/Message.kt index cc0853d048..f61277b573 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/messages/Message.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/messages/Message.kt @@ -5,6 +5,7 @@ import org.session.libsession.database.StorageProtocol import org.session.libsession.messaging.MessagingModuleConfiguration import org.session.libsession.messaging.messages.control.ExpirationTimerUpdate import org.session.libsession.messaging.messages.visible.VisibleMessage +import org.session.libsession.snode.SnodeMessage import org.session.libsignal.protos.SignalServiceProtos import org.session.libsignal.protos.SignalServiceProtos.Content.ExpirationType @@ -25,7 +26,7 @@ abstract class Message { open val coerceDisappearAfterSendToRead = false - open val defaultTtl: Long = 14 * 24 * 60 * 60 * 1000 + open val defaultTtl: Long = SnodeMessage.DEFAULT_TTL open val ttl: Long get() = specifiedTtl ?: defaultTtl open val isSelfSendValid: Boolean = false diff --git a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/ReceivedMessageHandler.kt b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/ReceivedMessageHandler.kt index c658847831..be4756d835 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/ReceivedMessageHandler.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/ReceivedMessageHandler.kt @@ -55,6 +55,7 @@ import org.session.libsignal.crypto.ecc.DjbECPublicKey import org.session.libsignal.crypto.ecc.ECKeyPair import org.session.libsignal.messages.SignalServiceGroup import org.session.libsignal.protos.SignalServiceProtos +import org.session.libsignal.protos.SignalServiceProtos.DataMessage.GroupUpdateMemberChangeMessage import org.session.libsignal.protos.SignalServiceProtos.SharedConfigMessage import org.session.libsignal.utilities.AccountId import org.session.libsignal.utilities.Base64 diff --git a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/ClosedGroupPoller.kt b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/ClosedGroupPoller.kt index 3a4123e5af..ae3135167e 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/ClosedGroupPoller.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/ClosedGroupPoller.kt @@ -5,10 +5,10 @@ import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Deferred import kotlinx.coroutines.Job import kotlinx.coroutines.async -import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay import kotlinx.coroutines.isActive import kotlinx.coroutines.launch +import kotlinx.coroutines.supervisorScope import network.loki.messenger.libsession_util.util.Sodium import org.session.libsession.database.StorageProtocol import org.session.libsession.messaging.groups.GroupManagerV2 @@ -22,7 +22,6 @@ import org.session.libsession.snode.model.RetrieveMessageResponse import org.session.libsession.snode.utilities.await import org.session.libsession.utilities.ConfigFactoryProtocol import org.session.libsession.utilities.ConfigMessage -import org.session.libsession.utilities.getClosedGroup import org.session.libsignal.database.LokiAPIDatabaseProtocol import org.session.libsignal.utilities.AccountId import org.session.libsignal.utilities.Log @@ -102,9 +101,9 @@ class ClosedGroupPoller( job = null } - private suspend fun poll(snode: Snode): Unit = coroutineScope { + private suspend fun poll(snode: Snode): Unit = supervisorScope { val groupAuth = - configFactoryProtocol.getGroupAuth(closedGroupSessionId) ?: return@coroutineScope + configFactoryProtocol.getGroupAuth(closedGroupSessionId) ?: return@supervisorScope val configHashesToExtends = configFactoryProtocol.withGroupConfigs(closedGroupSessionId) { buildSet { addAll(it.groupKeys.currentHashes()) @@ -121,23 +120,21 @@ class ClosedGroupPoller( val pollingTasks = mutableListOf>>() - pollingTasks += "retrieving revoked messages" to async { - handleRevoked( - SnodeAPI.sendBatchRequest( - snode, - closedGroupSessionId.hexString, - SnodeAPI.buildAuthenticatedRetrieveBatchRequest( - lastHash = lokiApiDatabase.getLastMessageHashValue( - snode, - closedGroupSessionId.hexString, - Namespace.REVOKED_GROUP_MESSAGES() - ).orEmpty(), - auth = groupAuth, - namespace = Namespace.REVOKED_GROUP_MESSAGES(), - maxSize = null, - ), - RetrieveMessageResponse::class.java - ) + val receiveRevokeMessage = async { + SnodeAPI.sendBatchRequest( + snode, + closedGroupSessionId.hexString, + SnodeAPI.buildAuthenticatedRetrieveBatchRequest( + lastHash = lokiApiDatabase.getLastMessageHashValue( + snode, + closedGroupSessionId.hexString, + Namespace.REVOKED_GROUP_MESSAGES() + ).orEmpty(), + auth = groupAuth, + namespace = Namespace.REVOKED_GROUP_MESSAGES(), + maxSize = null, + ), + RetrieveMessageResponse::class.java ) } @@ -198,18 +195,28 @@ class ClosedGroupPoller( } } - // The retrieval of the config and regular messages can be done concurrently, + // The retrieval of the all group messages can be done concurrently, // however, in order for the messages to be able to be decrypted, the config messages // must be processed first. pollingTasks += "polling and handling group config keys and messages" to async { - val (keysMessage, infoMessage, membersMessage) = groupConfigRetrieval.map { it.await() } - saveLastMessageHash(snode, keysMessage, Namespace.ENCRYPTION_KEYS()) - saveLastMessageHash(snode, infoMessage, Namespace.CLOSED_GROUP_INFO()) - saveLastMessageHash(snode, membersMessage, Namespace.CLOSED_GROUP_MEMBERS()) - handleGroupConfigMessages(keysMessage, infoMessage, membersMessage) + val result = runCatching { + val (keysMessage, infoMessage, membersMessage) = groupConfigRetrieval.map { it.await() } + handleGroupConfigMessages(keysMessage, infoMessage, membersMessage) + saveLastMessageHash(snode, keysMessage, Namespace.ENCRYPTION_KEYS()) + saveLastMessageHash(snode, infoMessage, Namespace.CLOSED_GROUP_INFO()) + saveLastMessageHash(snode, membersMessage, Namespace.CLOSED_GROUP_MEMBERS()) - val regularMessages = groupMessageRetrieval.await() - handleMessages(regularMessages, snode) + val regularMessages = groupMessageRetrieval.await() + handleMessages(regularMessages, snode) + } + + // Revoke message must be handled regardless, and at the end + val revokedMessages = receiveRevokeMessage.await() + handleRevoked(revokedMessages) + saveLastMessageHash(snode, revokedMessages, Namespace.REVOKED_GROUP_MESSAGES()) + + // Propagate any prior exceptions + result.getOrThrow() } // Wait for all tasks to complete, gather any exceptions happened during polling @@ -254,23 +261,23 @@ class ClosedGroupPoller( ) if (decoded != null) { - Log.d(TAG, "decoded kick message was for us") val message = decoded.decodeToString() - if (Sodium.KICKED_REGEX.matches(message)) { - val (sessionId, generation) = message.split("-") - val currentKeysGeneration by lazy { - configFactoryProtocol.withGroupConfigs(closedGroupSessionId) { - it.groupKeys.currentGeneration() - } + val matcher = Sodium.KICKED_REGEX.matcher(message) + if (matcher.matches()) { + val sessionId = matcher.group(1) + val messageGeneration = matcher.group(2)!!.toInt() + val currentKeysGeneration = configFactoryProtocol.withGroupConfigs(closedGroupSessionId) { + it.groupKeys.currentGeneration() } - if (sessionId == storage.getUserPublicKey() && generation.toInt() >= currentKeysGeneration) { - try { - groupManagerV2.handleKicked(closedGroupSessionId) - } catch (e: Exception) { - Log.e("GroupPoller", "Error handling kicked message: $e") - } + val isForMe = sessionId == storage.getUserPublicKey() + Log.d(TAG, "Received kicked message, for us? ${sessionId == storage.getUserPublicKey()}, message key generation = $messageGeneration, our key generation = $currentKeysGeneration") + + if (isForMe && messageGeneration >= currentKeysGeneration) { + groupManagerV2.handleKicked(closedGroupSessionId) } + } else { + Log.w(TAG, "Received an invalid kicked message") } } } diff --git a/libsession/src/main/java/org/session/libsession/snode/OnionRequestAPI.kt b/libsession/src/main/java/org/session/libsession/snode/OnionRequestAPI.kt index dd0fdec410..c25a6ba1db 100644 --- a/libsession/src/main/java/org/session/libsession/snode/OnionRequestAPI.kt +++ b/libsession/src/main/java/org/session/libsession/snode/OnionRequestAPI.kt @@ -1,6 +1,5 @@ package org.session.libsession.snode -import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.launch import nl.komponents.kovenant.Deferred @@ -26,12 +25,10 @@ import org.session.libsignal.utilities.HTTP import org.session.libsignal.utilities.JsonUtil import org.session.libsignal.utilities.Log import org.session.libsignal.utilities.Snode -import org.session.libsignal.utilities.ThreadUtils import org.session.libsignal.utilities.recover import org.session.libsignal.utilities.toHexString import java.util.concurrent.atomic.AtomicReference import kotlin.collections.set -import kotlin.coroutines.EmptyCoroutineContext private typealias Path = List @@ -603,11 +600,7 @@ object OnionRequestAPI { val bodyAsString = json["body"] as String JsonUtil.fromJson(bodyAsString, Map::class.java) } - if (body["t"] != null) { - val timestamp = body["t"] as Long - val offset = timestamp - System.currentTimeMillis() - SnodeAPI.clockOffset = offset - } + if (body.containsKey("hf")) { @Suppress("UNCHECKED_CAST") val currentHf = body["hf"] as List diff --git a/libsession/src/main/java/org/session/libsession/snode/SnodeAPI.kt b/libsession/src/main/java/org/session/libsession/snode/SnodeAPI.kt index c484259d50..7578f2e8ae 100644 --- a/libsession/src/main/java/org/session/libsession/snode/SnodeAPI.kt +++ b/libsession/src/main/java/org/session/libsession/snode/SnodeAPI.kt @@ -25,6 +25,7 @@ import nl.komponents.kovenant.functional.bind import nl.komponents.kovenant.functional.map import nl.komponents.kovenant.task import nl.komponents.kovenant.unwrap +import org.session.libsession.messaging.MessagingModuleConfiguration import org.session.libsession.messaging.utilities.MessageWrapper import org.session.libsession.messaging.utilities.SodiumUtilities.sodium import org.session.libsession.snode.model.BatchResponse @@ -47,6 +48,7 @@ import org.session.libsignal.utilities.Log import org.session.libsignal.utilities.Snode import org.session.libsignal.utilities.prettifiedDescription import org.session.libsignal.utilities.retryIfNeeded +import java.util.Date import java.util.Locale import kotlin.collections.component1 import kotlin.collections.component2 @@ -63,15 +65,11 @@ object SnodeAPI { internal var snodePool: Set get() = database.getSnodePool() set(newValue) { database.setSnodePool(newValue) } - /** - * The offset between the user's clock and the Service Node's clock. Used in cases where the - * user's clock is incorrect. - */ - internal var clockOffset = 0L + @Deprecated("Use a dependency injected SnodeClock.currentTimeMills() instead") @JvmStatic val nowWithOffset - get() = System.currentTimeMillis() + clockOffset + get() = MessagingModuleConfiguration.shared.clock.currentTimeMills() internal var forkInfo by observable(database.getForkInfo()) { _, oldValue, newValue -> if (newValue > oldValue) { @@ -418,7 +416,6 @@ object SnodeAPI { namespace = namespace, auth = auth, verificationData = { ns, t -> "${Snode.Method.SendMessage.rawValue}$ns$t" }, - timestamp = message.timestamp ) { putAll(message.toJSON()) } @@ -785,7 +782,7 @@ object SnodeAPI { parseRawMessagesResponse(resp, snode, auth.accountId.hexString) } - private fun getNetworkTime(snode: Snode): Promise, Exception> = + fun getNetworkTime(snode: Snode): Promise, Exception> = invoke(Snode.Method.Info, snode, emptyMap()).map { rawResponse -> val timestamp = rawResponse["timestamp"] as? Long ?: -1 snode to timestamp @@ -805,13 +802,15 @@ object SnodeAPI { "Message sent to ${message.recipient} but authenticated with ${auth.accountId.hexString}" } + val timestamp = nowWithOffset + buildAuthenticatedParameters( auth = auth, namespace = namespace, verificationData = { ns, t -> "${Snode.Method.SendMessage.rawValue}$ns$t" }, - timestamp = message.timestamp + timestamp = timestamp ) { - put("sig_timestamp", message.timestamp) + put("sig_timestamp", timestamp) putAll(message.toJSON()) } } else { @@ -921,7 +920,7 @@ object SnodeAPI { fun deleteAllMessages(auth: SwarmAuth): Promise, Exception> = scope.retrySuspendAsPromise(maxRetryCount) { val snode = getSingleTargetSnode(auth.accountId.hexString).await() - val (_, timestamp) = getNetworkTime(snode).await() + val timestamp = MessagingModuleConfiguration.shared.clock.waitForNetworkAdjustedTime() val params = buildAuthenticatedParameters( auth = auth, diff --git a/libsession/src/main/java/org/session/libsession/snode/SnodeClock.kt b/libsession/src/main/java/org/session/libsession/snode/SnodeClock.kt new file mode 100644 index 0000000000..c2f8db3b0d --- /dev/null +++ b/libsession/src/main/java/org/session/libsession/snode/SnodeClock.kt @@ -0,0 +1,88 @@ +package org.session.libsession.snode + +import android.os.SystemClock +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.Job +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.filterNotNull +import kotlinx.coroutines.flow.first +import kotlinx.coroutines.launch +import org.session.libsession.snode.utilities.await +import org.session.libsignal.utilities.Log +import java.util.Date + +/** + * A class that manages the network time by querying the network time from a random snode. The + * primary goal of this class is to provide a time that is not tied to current system time and not + * prone to time changes locally. + * + * Before the first network query is successfully, calling [currentTimeMills] will return the current + * system time. + */ +class SnodeClock() { + private val instantState = MutableStateFlow(null) + private var job: Job? = null + + fun start() { + require(job == null) { "Already started" } + + job = GlobalScope.launch { + while (true) { + try { + val node = SnodeAPI.getRandomSnode().await() + val requestStarted = SystemClock.uptimeMillis() + + var networkTime = SnodeAPI.getNetworkTime(node).await().second + val requestEnded = SystemClock.uptimeMillis() + + // Adjust the network time to account for the time it took to make the request + // so that the network time equals to the time when the request was started + networkTime -= (requestEnded - requestStarted) / 2 + + val inst = Instant(requestStarted, networkTime) + + Log.d("SnodeClock", "Network time: ${Date(inst.now())}, system time: ${Date()}") + + instantState.value = inst + } catch (e: Exception) { + Log.e("SnodeClock", "Failed to get network time. Retrying in a few seconds", e) + } finally { + // Retry frequently if we haven't got any result before + val delayMills = if (instantState.value == null) { + 3_000L + } else { + 3600_000L + } + + delay(delayMills) + } + } + } + } + + /** + * Wait for the network adjusted time to come through. + */ + suspend fun waitForNetworkAdjustedTime(): Long { + return instantState.filterNotNull().first().now() + } + + /** + * Get the current time in milliseconds. If the network time is not available yet, this method + * will return the current system time. + */ + fun currentTimeMills(): Long { + return instantState.value?.now() ?: System.currentTimeMillis() + } + + private class Instant( + val systemUptime: Long, + val networkTime: Long, + ) { + fun now(): Long { + val elapsed = SystemClock.uptimeMillis() - systemUptime + return networkTime + elapsed + } + } +} \ No newline at end of file diff --git a/libsession/src/main/java/org/session/libsession/snode/SnodeMessage.kt b/libsession/src/main/java/org/session/libsession/snode/SnodeMessage.kt index a44896eebf..8fc22a8303 100644 --- a/libsession/src/main/java/org/session/libsession/snode/SnodeMessage.kt +++ b/libsession/src/main/java/org/session/libsession/snode/SnodeMessage.kt @@ -32,6 +32,7 @@ data class SnodeMessage( } companion object { - const val CONFIG_TTL: Long = 30 * 24 * 60 * 60 * 1000L + const val CONFIG_TTL: Long = 30 * 24 * 60 * 60 * 1000L // 30 days + const val DEFAULT_TTL: Long = 14 * 24 * 60 * 60 * 1000L // 14 days } } diff --git a/libsession/src/main/java/org/session/libsession/utilities/ConfigFactoryProtocol.kt b/libsession/src/main/java/org/session/libsession/utilities/ConfigFactoryProtocol.kt index 6a912f4bf7..33dcc5cb59 100644 --- a/libsession/src/main/java/org/session/libsession/utilities/ConfigFactoryProtocol.kt +++ b/libsession/src/main/java/org/session/libsession/utilities/ConfigFactoryProtocol.kt @@ -1,6 +1,11 @@ package org.session.libsession.utilities import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.filter +import kotlinx.coroutines.flow.first +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.onStart +import kotlinx.coroutines.withTimeoutOrNull import network.loki.messenger.libsession_util.MutableConfig import network.loki.messenger.libsession_util.MutableContacts import network.loki.messenger.libsession_util.MutableConversationVolatileConfig @@ -94,6 +99,52 @@ fun ConfigFactoryProtocol.getClosedGroup(groupId: AccountId): GroupInfo.ClosedGr return withUserConfigs { it.userGroups.getClosedGroup(groupId.hexString) } } +/** + * Wait until all user configs are pushed to the server. + * + * This function is not essential to the pushing of the configs, the config push will schedule + * itself upon changes, so this function is purely observatory. + * + * This function will check the user configs immediately, if nothing needs to be pushed, it will return immediately. + * + * @return True if all user configs are pushed, false if the timeout is reached. + */ +suspend fun ConfigFactoryProtocol.waitUntilUserConfigsPushed(timeoutMills: Long = 10_000L): Boolean { + fun needsPush() = withUserConfigs { configs -> + UserConfigType.entries.any { configs.getConfig(it).needsPush() } + } + + return withTimeoutOrNull(timeoutMills){ + configUpdateNotifications + .onStart { emit(ConfigUpdateNotification.UserConfigs) } // Trigger the filtering immediately + .filter { it == ConfigUpdateNotification.UserConfigs && !needsPush() } + .first() + } != null +} + +/** + * Wait until all configs of given group are pushed to the server. + * + * This function is not essential to the pushing of the configs, the config push will schedule + * itself upon changes, so this function is purely observatory. + * + * This function will check the group configs immediately, if nothing needs to be pushed, it will return immediately. + * + * @return True if all group configs are pushed, false if the timeout is reached. + */ +suspend fun ConfigFactoryProtocol.waitUntilGroupConfigsPushed(groupId: AccountId, timeoutMills: Long = 10_000L): Boolean { + fun needsPush() = withGroupConfigs(groupId) { configs -> + configs.groupInfo.needsPush() || configs.groupMembers.needsPush() + } + + return withTimeoutOrNull(timeoutMills) { + configUpdateNotifications + .onStart { emit(ConfigUpdateNotification.GroupConfigsUpdated(groupId)) } // Trigger the filtering immediately + .filter { it == ConfigUpdateNotification.GroupConfigsUpdated(groupId) && !needsPush() } + .first() + } != null +} + interface UserConfigs { val contacts: ReadableContacts val userGroups: ReadableUserGroupsConfig