From 1f5fde0d9a2aa19992836fcaacfeea343274c947 Mon Sep 17 00:00:00 2001 From: SessionHero01 <180888785+SessionHero01@users.noreply.github.com> Date: Wed, 2 Oct 2024 15:17:13 +1000 Subject: [PATCH] Various issues --- .../securesms/dependencies/ConfigFactory.kt | 23 +++--- .../securesms/groups/GroupManagerV2Impl.kt | 6 +- .../onboarding/loading/LoadingViewModel.kt | 2 +- .../messaging/configs/ConfigSyncHandler.kt | 36 ++++++--- .../messaging/groups/GroupManagerV2.kt | 5 +- .../messaging/jobs/MessageSendJob.kt | 2 +- .../messages/control/GroupUpdated.kt | 4 +- .../ReceivedMessageHandler.kt | 5 +- .../pollers/ClosedGroupPoller.kt | 58 ++++++++------ .../libsession/snode/OnionRequestAPI.kt | 10 +-- .../snode/OnionRequestEncryption.kt | 78 +++++++++---------- .../libsession/snode/utilities/PromiseUtil.kt | 2 +- .../utilities/ConfigFactoryProtocol.kt | 6 +- 13 files changed, 133 insertions(+), 104 deletions(-) diff --git a/app/src/main/java/org/thoughtcrime/securesms/dependencies/ConfigFactory.kt b/app/src/main/java/org/thoughtcrime/securesms/dependencies/ConfigFactory.kt index 26a3a629d3..58ef0cd562 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/dependencies/ConfigFactory.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/dependencies/ConfigFactory.kt @@ -41,6 +41,7 @@ import org.session.libsession.utilities.MutableUserConfigs import org.session.libsession.utilities.TextSecurePreferences import org.session.libsession.utilities.UserConfigType import org.session.libsession.utilities.UserConfigs +import org.session.libsession.utilities.getClosedGroup import org.session.libsignal.crypto.ecc.DjbECPublicKey import org.session.libsignal.utilities.AccountId import org.session.libsignal.utilities.Hex @@ -299,11 +300,7 @@ class ConfigFactory @Inject constructor( override fun withGroupConfigs(groupId: AccountId, cb: (GroupConfigs) -> T): T { val configs = groupConfigs.getOrPut(groupId) { - val groupAdminKey = requireNotNull(withUserConfigs { - it.userGroups.getClosedGroup(groupId.hexString) - }) { - "Group not found" - }.adminKey + val groupAdminKey = getClosedGroup(groupId)?.adminKey GroupConfigsImpl( requiresCurrentUserED25519SecKey(), @@ -318,7 +315,14 @@ class ConfigFactory @Inject constructor( } } - private fun doWithMutableGroupConfigs(groupId: AccountId, cb: (GroupConfigsImpl) -> Pair): T { + private fun doWithMutableGroupConfigs( + groupId: AccountId, + recreateConfigInstances: Boolean, + cb: (GroupConfigsImpl) -> Pair): T { + if (recreateConfigInstances) { + groupConfigs.remove(groupId) + } + val (result, changed) = withGroupConfigs(groupId) { configs -> cb(configs as GroupConfigsImpl) } @@ -336,9 +340,10 @@ class ConfigFactory @Inject constructor( override fun withMutableGroupConfigs( groupId: AccountId, + recreateConfigInstances: Boolean, cb: (MutableGroupConfigs) -> T ): T { - return doWithMutableGroupConfigs(groupId) { + return doWithMutableGroupConfigs(recreateConfigInstances = recreateConfigInstances, groupId = groupId) { cb(it) to it.dumpIfNeeded() } } @@ -376,7 +381,7 @@ class ConfigFactory @Inject constructor( info: List, members: List ) { - doWithMutableGroupConfigs(groupId) { configs -> + doWithMutableGroupConfigs(groupId, false) { configs -> // Keys must be loaded first as they are used to decrypt the other config messages val keysLoaded = keys.fold(false) { acc, msg -> configs.groupKeys.loadKey(msg.data, msg.hash, msg.timestamp, configs.groupInfo.pointer, configs.groupMembers.pointer) || acc @@ -424,7 +429,7 @@ class ConfigFactory @Inject constructor( return } - doWithMutableGroupConfigs(groupId) { configs -> + doWithMutableGroupConfigs(groupId, false) { configs -> members?.let { (push, result) -> configs.groupMembers.confirmPushed(push.seqNo, result.hash) } info?.let { (push, result) -> configs.groupInfo.confirmPushed(push.seqNo, result.hash) } keysPush?.let { (hash, timestamp) -> diff --git a/app/src/main/java/org/thoughtcrime/securesms/groups/GroupManagerV2Impl.kt b/app/src/main/java/org/thoughtcrime/securesms/groups/GroupManagerV2Impl.kt index 5b191351c0..8bd89f2c53 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/groups/GroupManagerV2Impl.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/groups/GroupManagerV2Impl.kt @@ -634,7 +634,7 @@ class GroupManagerV2Impl @Inject constructor( pollerFactory.pollerFor(group.groupAccountId)?.start() } - override suspend fun onReceiveInvitation( + override suspend fun handleInvitation( groupId: AccountId, groupName: String, authData: ByteArray, @@ -663,7 +663,7 @@ class GroupManagerV2Impl @Inject constructor( } } - override suspend fun onReceivePromotion( + override suspend fun handlePromotion( groupId: AccountId, groupName: String, adminKey: ByteArray, @@ -692,7 +692,7 @@ class GroupManagerV2Impl @Inject constructor( } // Update our promote state - configFactory.withMutableGroupConfigs(groupId) { configs -> + configFactory.withMutableGroupConfigs(recreateConfigInstances = true, groupId = groupId) { configs -> configs.groupMembers.get(userAuth.accountId.hexString)?.let { member -> configs.groupMembers.set(member.setPromoteSuccess()) } diff --git a/app/src/main/java/org/thoughtcrime/securesms/onboarding/loading/LoadingViewModel.kt b/app/src/main/java/org/thoughtcrime/securesms/onboarding/loading/LoadingViewModel.kt index 5b8e8bac58..173a2aa71f 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/onboarding/loading/LoadingViewModel.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/onboarding/loading/LoadingViewModel.kt @@ -60,7 +60,7 @@ internal class LoadingViewModel @Inject constructor( val events = _events.asSharedFlow() init { - viewModelScope.launch(Dispatchers.IO) { + viewModelScope.launch { state.flatMapLatest { when (it) { State.LOADING -> progress(0f, 1f, TIMEOUT_TIME) diff --git a/libsession/src/main/java/org/session/libsession/messaging/configs/ConfigSyncHandler.kt b/libsession/src/main/java/org/session/libsession/messaging/configs/ConfigSyncHandler.kt index 78c3be487b..c5c81c14af 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/configs/ConfigSyncHandler.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/configs/ConfigSyncHandler.kt @@ -33,6 +33,7 @@ import org.session.libsignal.utilities.Base64 import org.session.libsignal.utilities.Log import org.session.libsignal.utilities.Namespace import org.session.libsignal.utilities.Snode +import org.session.libsignal.utilities.retryWithUniformInterval import java.util.concurrent.Executors import javax.inject.Inject @@ -59,7 +60,6 @@ class ConfigSyncHandler @Inject constructor( configFactory.configUpdateNotifications .collect { changes -> - try { when (changes) { is ConfigUpdateNotification.GroupConfigsDeleted -> { groupMutex.remove(changes.groupId) @@ -68,24 +68,32 @@ class ConfigSyncHandler @Inject constructor( is ConfigUpdateNotification.GroupConfigsUpdated -> { // Group config pushing is limited to its own dispatcher launch { - groupMutex.getOrPut(changes.groupId) { Mutex() }.withLock { - pushGroupConfigsChangesIfNeeded(changes.groupId) + try { + retryWithUniformInterval { + groupMutex.getOrPut(changes.groupId) { Mutex() }.withLock { + pushGroupConfigsChangesIfNeeded(changes.groupId) + } + } + } catch (e: Exception) { + Log.e(TAG, "Failed to push group configs", e) } } } ConfigUpdateNotification.UserConfigs -> launch { - userMutex.withLock { - pushUserConfigChangesIfNeeded() + try { + retryWithUniformInterval { + userMutex.withLock { + pushUserConfigChangesIfNeeded() + } + } + } catch (e: Exception) { + Log.e(TAG, "Failed to push user configs", e) } } } - } catch (e: Exception) { - Log.e(TAG, "Error handling config update", e) } - } } - } private suspend fun pushGroupConfigsChangesIfNeeded(groupId: AccountId) = coroutineScope { @@ -241,11 +249,17 @@ class ConfigSyncHandler @Inject constructor( val pushTasks = pushes.map { (configType, configPush) -> async { - (configType to configPush) to pushConfig(userAuth, snode, configPush, configType.namespace) + (configType to configPush) to pushConfig( + userAuth, + snode, + configPush, + configType.namespace + ) } } - val pushResults = pushTasks.awaitAll().associate { it.first.first to (it.first.second to it.second) } + val pushResults = + pushTasks.awaitAll().associate { it.first.first to (it.first.second to it.second) } Log.d(TAG, "Pushed ${pushResults.size} user configs") diff --git a/libsession/src/main/java/org/session/libsession/messaging/groups/GroupManagerV2.kt b/libsession/src/main/java/org/session/libsession/messaging/groups/GroupManagerV2.kt index 89b8776fcf..6d6ac37c93 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/groups/GroupManagerV2.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/groups/GroupManagerV2.kt @@ -3,7 +3,6 @@ package org.session.libsession.messaging.groups import org.session.libsession.messaging.contacts.Contact import org.session.libsession.messaging.messages.control.GroupUpdated import org.session.libsession.utilities.recipients.Recipient -import org.session.libsignal.protos.SignalServiceProtos import org.session.libsignal.protos.SignalServiceProtos.DataMessage.GroupUpdateDeleteMemberContentMessage import org.session.libsignal.utilities.AccountId @@ -36,7 +35,7 @@ interface GroupManagerV2 { suspend fun promoteMember(group: AccountId, members: List) - suspend fun onReceiveInvitation( + suspend fun handleInvitation( groupId: AccountId, groupName: String, authData: ByteArray, @@ -44,7 +43,7 @@ interface GroupManagerV2 { inviteMessageHash: String? ) - suspend fun onReceivePromotion( + suspend fun handlePromotion( groupId: AccountId, groupName: String, adminKey: ByteArray, diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageSendJob.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageSendJob.kt index a85b4b73d1..d28d38c771 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageSendJob.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageSendJob.kt @@ -94,7 +94,7 @@ class MessageSendJob(val message: Message, val destination: Destination) : Job { } private fun handleFailure(dispatcherName: String, error: Exception) { - Log.w(TAG, "Failed to send $message::class.simpleName.") + Log.w(TAG, "Failed to send $message::class.simpleName.", error) val message = message as? VisibleMessage if (message != null) { if (!MessagingModuleConfiguration.shared.messageDataProvider.isOutgoingMessage(message.sentTimestamp!!)) { diff --git a/libsession/src/main/java/org/session/libsession/messaging/messages/control/GroupUpdated.kt b/libsession/src/main/java/org/session/libsession/messaging/messages/control/GroupUpdated.kt index 4f49c192e5..b7a8c983ab 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/messages/control/GroupUpdated.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/messages/control/GroupUpdated.kt @@ -4,7 +4,9 @@ import org.session.libsignal.protos.SignalServiceProtos.Content import org.session.libsignal.protos.SignalServiceProtos.DataMessage import org.session.libsignal.protos.SignalServiceProtos.DataMessage.GroupUpdateMessage -class GroupUpdated(val inner: GroupUpdateMessage): ControlMessage() { +class GroupUpdated @JvmOverloads constructor( + val inner: GroupUpdateMessage = GroupUpdateMessage.getDefaultInstance() +): ControlMessage() { override fun isValid(): Boolean { return true // TODO: add the validation here diff --git a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/ReceivedMessageHandler.kt b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/ReceivedMessageHandler.kt index e56ab2c3be..c658847831 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/ReceivedMessageHandler.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/ReceivedMessageHandler.kt @@ -6,7 +6,6 @@ import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.launch import network.loki.messenger.libsession_util.util.ExpiryMode import network.loki.messenger.libsession_util.util.Sodium -import network.loki.messenger.libsession_util.util.afterSend import org.session.libsession.avatars.AvatarHelper import org.session.libsession.database.userAuth import org.session.libsession.messaging.MessagingModuleConfiguration @@ -660,7 +659,7 @@ private fun handlePromotionMessage(message: GroupUpdated) { GlobalScope.launch { try { MessagingModuleConfiguration.shared.groupManagerV2 - .onReceivePromotion( + .handlePromotion( groupId = AccountId(IdPrefix.GROUP, keyPair.pubKey), groupName = promotion.name, adminKey = keyPair.secretKey, @@ -703,7 +702,7 @@ private fun MessageReceiver.handleNewLibSessionClosedGroupMessage(message: Group GlobalScope.launch { try { MessagingModuleConfiguration.shared.groupManagerV2 - .onReceiveInvitation( + .handleInvitation( groupId = groupId, groupName = invite.name, authData = invite.memberAuthData.toByteArray(), diff --git a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/ClosedGroupPoller.kt b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/ClosedGroupPoller.kt index 98005cf0c0..3a4123e5af 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/ClosedGroupPoller.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/ClosedGroupPoller.kt @@ -42,6 +42,7 @@ class ClosedGroupPoller( ) { companion object { private const val POLL_INTERVAL = 3_000L + private const val POLL_ERROR_RETRY_DELAY = 10_000L private const val TAG = "ClosedGroupPoller" } @@ -54,34 +55,43 @@ class ClosedGroupPoller( Log.d(TAG, "Starting closed group poller for ${closedGroupSessionId.hexString.take(4)}") job?.cancel() job = scope.launch(executor) { - var snode: Snode? = null - while (isActive) { - configFactoryProtocol.getClosedGroup(closedGroupSessionId) ?: break + try { + val swarmNodes = SnodeAPI.getSwarm(closedGroupSessionId.hexString).await().toMutableSet() + var currentSnode: Snode? = null - if (snode == null) { - Log.i(TAG, "No Snode, fetching one") - snode = SnodeAPI.getSingleTargetSnode(closedGroupSessionId.hexString).await() - } + while (isActive) { + if (currentSnode == null) { + check(swarmNodes.isNotEmpty()) { "No swarm nodes found" } + Log.d(TAG, "No current snode, getting a new one. Remaining in pool = ${swarmNodes.size - 1}") + currentSnode = swarmNodes.random() + swarmNodes.remove(currentSnode) + } - val nextPoll = runCatching { poll(snode!!) } - when { - nextPoll.isFailure -> { - Log.e(TAG, "Error polling closed group", nextPoll.exceptionOrNull()) - // Clearing snode so we get a new one next time - snode = null - delay(POLL_INTERVAL) - } - - nextPoll.getOrNull() == null -> { - // assume null poll time means don't continue polling, either the group has been deleted or something else - Log.d(TAG, "Stopping the closed group poller") - break - } - - else -> { - delay(POLL_INTERVAL) + val result = runCatching { poll(currentSnode!!) } + when { + result.isSuccess -> { + delay(POLL_INTERVAL) + } + + result.isFailure -> { + val error = result.exceptionOrNull()!! + if (error is CancellationException) { + throw error + } + + Log.e(TAG, "Error polling closed group", error) + // Clearing snode so we get a new one next time + currentSnode = null + delay(POLL_INTERVAL) + } + } } + } catch (e: CancellationException) { + throw e + } catch (e: Exception) { + Log.e(TAG, "Error during group poller", e) + delay(POLL_ERROR_RETRY_DELAY) } } } diff --git a/libsession/src/main/java/org/session/libsession/snode/OnionRequestAPI.kt b/libsession/src/main/java/org/session/libsession/snode/OnionRequestAPI.kt index 91eb35fbfd..dd0fdec410 100644 --- a/libsession/src/main/java/org/session/libsession/snode/OnionRequestAPI.kt +++ b/libsession/src/main/java/org/session/libsession/snode/OnionRequestAPI.kt @@ -296,22 +296,22 @@ object OnionRequestAPI { is Destination.Snode -> destination.snode is Destination.Server -> null } - return getPath(snodeToExclude).bind { path -> + return getPath(snodeToExclude).map { path -> guardSnode = path.first() // Encrypt in reverse order, i.e. the destination first - OnionRequestEncryption.encryptPayloadForDestination(payload, destination, version).bind { r -> + OnionRequestEncryption.encryptPayloadForDestination(payload, destination, version).let { r -> destinationSymmetricKey = r.symmetricKey // Recursively encrypt the layers of the onion (again in reverse order) encryptionResult = r @Suppress("NAME_SHADOWING") var path = path var rhs = destination - fun addLayer(): Promise { + fun addLayer(): EncryptionResult { return if (path.isEmpty()) { - Promise.of(encryptionResult) + encryptionResult } else { val lhs = Destination.Snode(path.last()) path = path.dropLast(1) - OnionRequestEncryption.encryptHop(lhs, rhs, encryptionResult).bind { r -> + OnionRequestEncryption.encryptHop(lhs, rhs, encryptionResult).let { r -> encryptionResult = r rhs = lhs addLayer() diff --git a/libsession/src/main/java/org/session/libsession/snode/OnionRequestEncryption.kt b/libsession/src/main/java/org/session/libsession/snode/OnionRequestEncryption.kt index 888d2c1d5f..e762fefcfe 100644 --- a/libsession/src/main/java/org/session/libsession/snode/OnionRequestEncryption.kt +++ b/libsession/src/main/java/org/session/libsession/snode/OnionRequestEncryption.kt @@ -38,57 +38,53 @@ object OnionRequestEncryption { payload: ByteArray, destination: Destination, version: Version - ): Promise { - return GlobalScope.asyncPromise { - val plaintext = if (version == Version.V4) { - payload - } else { - // Wrapping isn't needed for file server or open group onion requests - when (destination) { - is Destination.Snode -> encode(payload, mapOf("headers" to "")) - is Destination.Server -> payload - } + ): EncryptionResult { + val plaintext = if (version == Version.V4) { + payload + } else { + // Wrapping isn't needed for file server or open group onion requests + when (destination) { + is Destination.Snode -> encode(payload, mapOf("headers" to "")) + is Destination.Server -> payload } - val x25519PublicKey = when (destination) { - is Destination.Snode -> destination.snode.publicKeySet!!.x25519Key - is Destination.Server -> destination.x25519PublicKey - } - AESGCM.encrypt(plaintext, x25519PublicKey) } + val x25519PublicKey = when (destination) { + is Destination.Snode -> destination.snode.publicKeySet!!.x25519Key + is Destination.Server -> destination.x25519PublicKey + } + return AESGCM.encrypt(plaintext, x25519PublicKey) } /** * Encrypts the previous encryption result (i.e. that of the hop after this one) for this hop. Use this to build the layers of an onion request. */ - internal fun encryptHop(lhs: Destination, rhs: Destination, previousEncryptionResult: EncryptionResult): Promise { - return GlobalScope.asyncPromise { - val payload: MutableMap = when (rhs) { - is Destination.Snode -> { - mutableMapOf("destination" to rhs.snode.publicKeySet!!.ed25519Key) - } - - is Destination.Server -> { - mutableMapOf( - "host" to rhs.host, - "target" to rhs.target, - "method" to "POST", - "protocol" to rhs.scheme, - "port" to rhs.port - ) - } + internal fun encryptHop(lhs: Destination, rhs: Destination, previousEncryptionResult: EncryptionResult): EncryptionResult { + val payload: MutableMap = when (rhs) { + is Destination.Snode -> { + mutableMapOf("destination" to rhs.snode.publicKeySet!!.ed25519Key) } - payload["ephemeral_key"] = previousEncryptionResult.ephemeralPublicKey.toHexString() - val x25519PublicKey = when (lhs) { - is Destination.Snode -> { - lhs.snode.publicKeySet!!.x25519Key - } - is Destination.Server -> { - lhs.x25519PublicKey - } + is Destination.Server -> { + mutableMapOf( + "host" to rhs.host, + "target" to rhs.target, + "method" to "POST", + "protocol" to rhs.scheme, + "port" to rhs.port + ) } - val plaintext = encode(previousEncryptionResult.ciphertext, payload) - AESGCM.encrypt(plaintext, x25519PublicKey) } + payload["ephemeral_key"] = previousEncryptionResult.ephemeralPublicKey.toHexString() + val x25519PublicKey = when (lhs) { + is Destination.Snode -> { + lhs.snode.publicKeySet!!.x25519Key + } + + is Destination.Server -> { + lhs.x25519PublicKey + } + } + val plaintext = encode(previousEncryptionResult.ciphertext, payload) + return AESGCM.encrypt(plaintext, x25519PublicKey) } } diff --git a/libsession/src/main/java/org/session/libsession/snode/utilities/PromiseUtil.kt b/libsession/src/main/java/org/session/libsession/snode/utilities/PromiseUtil.kt index bed3163f0d..17e6f696b9 100644 --- a/libsession/src/main/java/org/session/libsession/snode/utilities/PromiseUtil.kt +++ b/libsession/src/main/java/org/session/libsession/snode/utilities/PromiseUtil.kt @@ -14,7 +14,7 @@ import kotlin.coroutines.resume import kotlin.coroutines.resumeWithException import kotlin.coroutines.suspendCoroutine -suspend fun Promise.await(): T { +suspend inline fun Promise.await(): T { return suspendCoroutine { cont -> success(cont::resume) fail(cont::resumeWithException) diff --git a/libsession/src/main/java/org/session/libsession/utilities/ConfigFactoryProtocol.kt b/libsession/src/main/java/org/session/libsession/utilities/ConfigFactoryProtocol.kt index 9b24484408..6a912f4bf7 100644 --- a/libsession/src/main/java/org/session/libsession/utilities/ConfigFactoryProtocol.kt +++ b/libsession/src/main/java/org/session/libsession/utilities/ConfigFactoryProtocol.kt @@ -31,7 +31,11 @@ interface ConfigFactoryProtocol { fun mergeUserConfigs(userConfigType: UserConfigType, messages: List) fun withGroupConfigs(groupId: AccountId, cb: (GroupConfigs) -> T): T - fun withMutableGroupConfigs(groupId: AccountId, cb: (MutableGroupConfigs) -> T): T + + /** + * @param recreateConfigInstances If true, the group configs will be recreated before calling the callback. This is useful when you have received an admin key or otherwise. + */ + fun withMutableGroupConfigs(groupId: AccountId, recreateConfigInstances: Boolean = false, cb: (MutableGroupConfigs) -> T): T fun conversationInConfig(publicKey: String?, groupPublicKey: String?, openGroupId: String?, visibleOnly: Boolean): Boolean fun canPerformChange(variant: String, publicKey: String, changeTimestampMs: Long): Boolean