Send message improvement

This commit is contained in:
SessionHero01 2024-10-03 15:29:56 +10:00
parent 6ab4ff2e28
commit 08ee07f5c5
No known key found for this signature in database
3 changed files with 81 additions and 89 deletions

View File

@ -3,9 +3,7 @@ package org.thoughtcrime.securesms.groups
import android.content.Context import android.content.Context
import com.google.protobuf.ByteString import com.google.protobuf.ByteString
import dagger.hilt.android.qualifiers.ApplicationContext import dagger.hilt.android.qualifiers.ApplicationContext
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.async import kotlinx.coroutines.async
import kotlinx.coroutines.withContext import kotlinx.coroutines.withContext
@ -33,6 +31,7 @@ import org.session.libsession.messaging.utilities.MessageAuthentication.buildMem
import org.session.libsession.messaging.utilities.SodiumUtilities import org.session.libsession.messaging.utilities.SodiumUtilities
import org.session.libsession.snode.OwnedSwarmAuth import org.session.libsession.snode.OwnedSwarmAuth
import org.session.libsession.snode.SnodeAPI import org.session.libsession.snode.SnodeAPI
import org.session.libsession.snode.SnodeClock
import org.session.libsession.snode.SnodeMessage import org.session.libsession.snode.SnodeMessage
import org.session.libsession.snode.model.BatchResponse import org.session.libsession.snode.model.BatchResponse
import org.session.libsession.snode.utilities.await import org.session.libsession.snode.utilities.await
@ -70,6 +69,7 @@ class GroupManagerV2Impl @Inject constructor(
private val pollerFactory: PollerFactory, private val pollerFactory: PollerFactory,
private val profileManager: SSKEnvironment.ProfileManagerProtocol, private val profileManager: SSKEnvironment.ProfileManagerProtocol,
@ApplicationContext val application: Context, @ApplicationContext val application: Context,
private val clock: SnodeClock,
) : GroupManagerV2 { ) : GroupManagerV2 {
private val dispatcher = Dispatchers.Default private val dispatcher = Dispatchers.Default
@ -95,7 +95,7 @@ class GroupManagerV2Impl @Inject constructor(
requireNotNull(storage.getUserPublicKey()) { "Our account ID is not available" } requireNotNull(storage.getUserPublicKey()) { "Our account ID is not available" }
val ourProfile = storage.getUserProfile() val ourProfile = storage.getUserProfile()
val groupCreationTimestamp = SnodeAPI.nowWithOffset val groupCreationTimestamp = clock.currentTimeMills()
// Create a group in the user groups config // Create a group in the user groups config
val group = configFactory.withMutableUserConfigs { configs -> val group = configFactory.withMutableUserConfigs { configs ->
@ -231,7 +231,7 @@ class GroupManagerV2Impl @Inject constructor(
recipient = group.hexString, recipient = group.hexString,
data = Base64.encodeBytes(memberKey), data = Base64.encodeBytes(memberKey),
ttl = SnodeMessage.CONFIG_TTL, ttl = SnodeMessage.CONFIG_TTL,
timestamp = SnodeAPI.nowWithOffset, timestamp = clock.currentTimeMills(),
), ),
auth = groupAuth, auth = groupAuth,
) )
@ -266,7 +266,7 @@ class GroupManagerV2Impl @Inject constructor(
) )
// Send a member change message to the group // Send a member change message to the group
val timestamp = SnodeAPI.nowWithOffset val timestamp = clock.currentTimeMills()
val signature = SodiumUtilities.sign( val signature = SodiumUtilities.sign(
buildMemberChangeSignature(GroupUpdateMemberChangeMessage.Type.ADDED, timestamp), buildMemberChangeSignature(GroupUpdateMemberChangeMessage.Type.ADDED, timestamp),
adminKey adminKey
@ -283,7 +283,6 @@ class GroupManagerV2Impl @Inject constructor(
.build() .build()
).apply { this.sentTimestamp = timestamp } ).apply { this.sentTimestamp = timestamp }
MessageSender.send(updatedMessage, Address.fromSerialized(group.hexString)) MessageSender.send(updatedMessage, Address.fromSerialized(group.hexString))
storage.insertGroupInfoChange(updatedMessage, group)
} }
override suspend fun removeMembers( override suspend fun removeMembers(
@ -444,7 +443,7 @@ class GroupManagerV2Impl @Inject constructor(
} }
// Send a group update message to the group telling members someone has been promoted // Send a group update message to the group telling members someone has been promoted
val timestamp = SnodeAPI.nowWithOffset val timestamp = clock.currentTimeMills()
val signature = SodiumUtilities.sign( val signature = SodiumUtilities.sign(
buildMemberChangeSignature(GroupUpdateMemberChangeMessage.Type.PROMOTED, timestamp), buildMemberChangeSignature(GroupUpdateMemberChangeMessage.Type.PROMOTED, timestamp),
adminKey adminKey
@ -463,7 +462,6 @@ class GroupManagerV2Impl @Inject constructor(
} }
MessageSender.send(message, Address.fromSerialized(group.hexString)) MessageSender.send(message, Address.fromSerialized(group.hexString))
storage.insertGroupInfoChange(message, group)
} }
private suspend fun flagMembersForRemoval( private suspend fun flagMembersForRemoval(
@ -485,7 +483,7 @@ class GroupManagerV2Impl @Inject constructor(
// 2. Send a member change message // 2. Send a member change message
if (sendMemberChangeMessage) { if (sendMemberChangeMessage) {
val timestamp = SnodeAPI.nowWithOffset val timestamp = clock.currentTimeMills()
val signature = SodiumUtilities.sign( val signature = SodiumUtilities.sign(
buildMemberChangeSignature( buildMemberChangeSignature(
GroupUpdateMemberChangeMessage.Type.REMOVED, GroupUpdateMemberChangeMessage.Type.REMOVED,
@ -698,7 +696,7 @@ class GroupManagerV2Impl @Inject constructor(
} else { } else {
lokiDatabase.addGroupInviteReferrer(groupThreadId, inviter.hexString) lokiDatabase.addGroupInviteReferrer(groupThreadId, inviter.hexString)
storage.insertGroupInviteControlMessage( storage.insertGroupInviteControlMessage(
SnodeAPI.nowWithOffset, clock.currentTimeMills(),
inviter.hexString, inviter.hexString,
groupId, groupId,
groupName groupName
@ -765,7 +763,7 @@ class GroupManagerV2Impl @Inject constructor(
name = groupName, name = groupName,
members = emptyList(), members = emptyList(),
admins = emptyList(), admins = emptyList(),
sentTimestamp = SnodeAPI.nowWithOffset, sentTimestamp = clock.currentTimeMills(),
) )
} }
@ -777,7 +775,7 @@ class GroupManagerV2Impl @Inject constructor(
it.groupInfo.setName(newName) it.groupInfo.setName(newName)
} }
val timestamp = SnodeAPI.nowWithOffset val timestamp = clock.currentTimeMills()
val signature = SodiumUtilities.sign( val signature = SodiumUtilities.sign(
buildInfoChangeVerifier(GroupUpdateInfoChangeMessage.Type.NAME, timestamp), buildInfoChangeVerifier(GroupUpdateInfoChangeMessage.Type.NAME, timestamp),
adminKey adminKey
@ -796,7 +794,7 @@ class GroupManagerV2Impl @Inject constructor(
sentTimestamp = timestamp sentTimestamp = timestamp
} }
MessageSender.sendNonDurably(message, Address.fromSerialized(groupId.hexString), false) MessageSender.send(message, Destination.ClosedGroup(groupId.hexString), false)
.await() .await()
storage.insertGroupInfoChange(message, groupId) storage.insertGroupInfoChange(message, groupId)
} }
@ -839,7 +837,7 @@ class GroupManagerV2Impl @Inject constructor(
} }
// Construct a message to ask members to delete the messages, sign if we are admin, then send // Construct a message to ask members to delete the messages, sign if we are admin, then send
val timestamp = SnodeAPI.nowWithOffset val timestamp = clock.currentTimeMills()
val signature = group.adminKey?.let { key -> val signature = group.adminKey?.let { key ->
SodiumUtilities.sign( SodiumUtilities.sign(
buildDeleteMemberContentSignature( buildDeleteMemberContentSignature(
@ -910,7 +908,7 @@ class GroupManagerV2Impl @Inject constructor(
} }
val adminKey = configFactory.getClosedGroup(groupId)?.adminKey val adminKey = configFactory.getClosedGroup(groupId)?.adminKey
if (!senderIsVerifiedAdmin && adminKey != null) { if (!senderIsVerifiedAdmin && adminKey != null && hashes.isNotEmpty()) {
// If the deletion request comes from a non-admin, and we as an admin, will also delete // If the deletion request comes from a non-admin, and we as an admin, will also delete
// the content from the swarm, provided that the messages are actually sent by that user // the content from the swarm, provided that the messages are actually sent by that user
if (storage.ensureMessageHashesAreSender( if (storage.ensureMessageHashesAreSender(

View File

@ -1,5 +1,9 @@
package org.session.libsession.messaging.sending_receiving package org.session.libsession.messaging.sending_receiving
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.async
import kotlinx.coroutines.launch
import kotlinx.coroutines.supervisorScope
import network.loki.messenger.libsession_util.util.ExpiryMode import network.loki.messenger.libsession_util.util.ExpiryMode
import nl.komponents.kovenant.Promise import nl.komponents.kovenant.Promise
import nl.komponents.kovenant.deferred import nl.komponents.kovenant.deferred
@ -30,6 +34,8 @@ import org.session.libsession.snode.SnodeAPI
import org.session.libsession.snode.SnodeAPI.nowWithOffset import org.session.libsession.snode.SnodeAPI.nowWithOffset
import org.session.libsession.snode.SnodeMessage import org.session.libsession.snode.SnodeMessage
import org.session.libsession.snode.SnodeModule import org.session.libsession.snode.SnodeModule
import org.session.libsession.snode.utilities.asyncPromise
import org.session.libsession.snode.utilities.await
import org.session.libsession.utilities.Address import org.session.libsession.utilities.Address
import org.session.libsession.utilities.Device import org.session.libsession.utilities.Device
import org.session.libsession.utilities.GroupUtil import org.session.libsession.utilities.GroupUtil
@ -77,9 +83,11 @@ object MessageSender {
return if (destination is Destination.LegacyOpenGroup || destination is Destination.OpenGroup || destination is Destination.OpenGroupInbox) { return if (destination is Destination.LegacyOpenGroup || destination is Destination.OpenGroup || destination is Destination.OpenGroupInbox) {
sendToOpenGroupDestination(destination, message) sendToOpenGroupDestination(destination, message)
} else { } else {
GlobalScope.asyncPromise {
sendToSnodeDestination(destination, message, isSyncMessage) sendToSnodeDestination(destination, message, isSyncMessage)
} }
} }
}
fun buildConfigMessageToSnode(destinationPubKey: String, message: SharedConfigurationMessage): SnodeMessage { fun buildConfigMessageToSnode(destinationPubKey: String, message: SharedConfigurationMessage): SnodeMessage {
return SnodeMessage( return SnodeMessage(
@ -207,9 +215,7 @@ object MessageSender {
} }
// One-on-One Chats & Closed Groups // One-on-One Chats & Closed Groups
private fun sendToSnodeDestination(destination: Destination, message: Message, isSyncMessage: Boolean = false): Promise<Unit, Exception> { private suspend fun sendToSnodeDestination(destination: Destination, message: Message, isSyncMessage: Boolean = false) = supervisorScope {
val deferred = deferred<Unit, Exception>()
val promise = deferred.promise
val storage = MessagingModuleConfiguration.shared.storage val storage = MessagingModuleConfiguration.shared.storage
val configFactory = MessagingModuleConfiguration.shared.configFactory val configFactory = MessagingModuleConfiguration.shared.configFactory
val userPublicKey = storage.getUserPublicKey() val userPublicKey = storage.getUserPublicKey()
@ -223,7 +229,7 @@ object MessageSender {
if (destination is Destination.Contact && message is VisibleMessage && !isSelfSend()) { if (destination is Destination.Contact && message is VisibleMessage && !isSelfSend()) {
SnodeModule.shared.broadcaster.broadcast("messageFailed", message.sentTimestamp!!) SnodeModule.shared.broadcaster.broadcast("messageFailed", message.sentTimestamp!!)
} }
deferred.reject(error) throw error
} }
try { try {
val snodeMessage = buildWrappedMessageToSnode(destination, message, isSyncMessage) val snodeMessage = buildWrappedMessageToSnode(destination, message, isSyncMessage)
@ -242,57 +248,43 @@ object MessageSender {
else -> listOf(Namespace.DEFAULT()) else -> listOf(Namespace.DEFAULT())
} }
namespaces.mapNotNull { namespace ->
if (destination is Destination.ClosedGroup) {
// possibly handle a failure for no user groups or no closed group signing key?
val groupAuth = configFactory.getGroupAuth(AccountId(destination.publicKey)) ?: return@mapNotNull null
val sendTasks = namespaces.map { namespace ->
if (destination is Destination.ClosedGroup) {
val groupAuth = requireNotNull(configFactory.getGroupAuth(AccountId(destination.publicKey))) {
"Unable to authorize group message send"
}
async {
SnodeAPI.sendMessage( SnodeAPI.sendMessage(
auth = groupAuth, auth = groupAuth,
message = snodeMessage, message = snodeMessage,
namespace = namespace namespace = namespace
) )
}
} else { } else {
async {
SnodeAPI.sendMessage(snodeMessage, auth = null, namespace = namespace) SnodeAPI.sendMessage(snodeMessage, auth = null, namespace = namespace)
} }
}.let { promises -> }
var isSuccess = false }
val promiseCount = promises.size
val errorCount = AtomicInteger(0) val sendTaskResults = sendTasks.map {
promises.forEach { promise: RawResponsePromise -> runCatching { it.await() }
promise.success { }
if (isSuccess) { return@success } // Succeed as soon as the first promise succeeds
isSuccess = true val firstSuccess = sendTaskResults.firstOrNull { it.isSuccess }
val hash = it["hash"] as? String
message.serverHash = hash if (firstSuccess != null) {
message.serverHash = firstSuccess.getOrThrow().hash
handleSuccessfulMessageSend(message, destination, isSyncMessage) handleSuccessfulMessageSend(message, destination, isSyncMessage)
} else {
val shouldNotify: Boolean = when (message) { // If all tasks failed, throw the first exception
is VisibleMessage, is UnsendRequest -> !isSyncMessage throw sendTaskResults.first().exceptionOrNull()!!
is CallMessage -> {
// Note: Other 'CallMessage' types are too big to send as push notifications
// so only send the 'preOffer' message as a notification
when (message.type) {
SignalServiceProtos.CallMessage.Type.PRE_OFFER -> true
else -> false
}
}
else -> false
}
deferred.resolve(Unit)
}
promise.fail {
errorCount.getAndIncrement()
if (errorCount.get() != promiseCount) { return@fail } // Only error out if all promises failed
handleFailure(it)
}
}
} }
} catch (exception: Exception) { } catch (exception: Exception) {
handleFailure(exception) handleFailure(exception)
} }
return promise
} }
private fun getSpecifiedTtl( private fun getSpecifiedTtl(
@ -511,9 +503,11 @@ object MessageSender {
if (message is ExpirationTimerUpdate) message.syncTarget = destination.publicKey if (message is ExpirationTimerUpdate) message.syncTarget = destination.publicKey
storage.markAsSyncing(timestamp, userPublicKey) storage.markAsSyncing(timestamp, userPublicKey)
GlobalScope.launch {
sendToSnodeDestination(Destination.Contact(userPublicKey), message, true) sendToSnodeDestination(Destination.Contact(userPublicKey), message, true)
} }
} }
}
fun handleFailedMessageSend(message: Message, error: Exception, isSyncMessage: Boolean = false) { fun handleFailedMessageSend(message: Message, error: Exception, isSyncMessage: Boolean = false) {
val storage = MessagingModuleConfiguration.shared.storage val storage = MessagingModuleConfiguration.shared.storage

View File

@ -18,17 +18,16 @@ import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.SendChannel import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import kotlinx.coroutines.selects.select import kotlinx.coroutines.selects.select
import kotlinx.coroutines.withContext
import nl.komponents.kovenant.Promise import nl.komponents.kovenant.Promise
import nl.komponents.kovenant.all import nl.komponents.kovenant.all
import nl.komponents.kovenant.functional.bind import nl.komponents.kovenant.functional.bind
import nl.komponents.kovenant.functional.map import nl.komponents.kovenant.functional.map
import nl.komponents.kovenant.task
import nl.komponents.kovenant.unwrap import nl.komponents.kovenant.unwrap
import org.session.libsession.messaging.MessagingModuleConfiguration import org.session.libsession.messaging.MessagingModuleConfiguration
import org.session.libsession.messaging.utilities.MessageWrapper import org.session.libsession.messaging.utilities.MessageWrapper
import org.session.libsession.messaging.utilities.SodiumUtilities.sodium import org.session.libsession.messaging.utilities.SodiumUtilities.sodium
import org.session.libsession.snode.model.BatchResponse import org.session.libsession.snode.model.BatchResponse
import org.session.libsession.snode.model.StoreMessageResponse
import org.session.libsession.snode.utilities.asyncPromise import org.session.libsession.snode.utilities.asyncPromise
import org.session.libsession.snode.utilities.await import org.session.libsession.snode.utilities.await
import org.session.libsession.snode.utilities.retrySuspendAsPromise import org.session.libsession.snode.utilities.retrySuspendAsPromise
@ -48,6 +47,7 @@ import org.session.libsignal.utilities.Log
import org.session.libsignal.utilities.Snode import org.session.libsignal.utilities.Snode
import org.session.libsignal.utilities.prettifiedDescription import org.session.libsignal.utilities.prettifiedDescription
import org.session.libsignal.utilities.retryIfNeeded import org.session.libsignal.utilities.retryIfNeeded
import org.session.libsignal.utilities.retryWithUniformInterval
import java.util.Date import java.util.Date
import java.util.Locale import java.util.Locale
import kotlin.collections.component1 import kotlin.collections.component1
@ -792,11 +792,12 @@ object SnodeAPI {
* Note: After this method returns, [auth] will not be used by any of async calls and it's afe * Note: After this method returns, [auth] will not be used by any of async calls and it's afe
* for the caller to clean up the associated resources if needed. * for the caller to clean up the associated resources if needed.
*/ */
fun sendMessage( suspend fun sendMessage(
message: SnodeMessage, message: SnodeMessage,
auth: SwarmAuth?, auth: SwarmAuth?,
namespace: Int = 0 namespace: Int = 0
): RawResponsePromise { ): StoreMessageResponse {
return retryWithUniformInterval(maxRetryCount = maxRetryCount) {
val params = if (auth != null) { val params = if (auth != null) {
check(auth.accountId.hexString == message.recipient) { check(auth.accountId.hexString == message.recipient) {
"Message sent to ${message.recipient} but authenticated with ${auth.accountId.hexString}" "Message sent to ${message.recipient} but authenticated with ${auth.accountId.hexString}"
@ -822,7 +823,6 @@ object SnodeAPI {
} }
} }
return scope.retrySuspendAsPromise(maxRetryCount) {
sendBatchRequest( sendBatchRequest(
snode = getSingleTargetSnode(message.recipient).await(), snode = getSingleTargetSnode(message.recipient).await(),
publicKey = message.recipient, publicKey = message.recipient,
@ -831,7 +831,7 @@ object SnodeAPI {
params = params, params = params,
namespace = namespace namespace = namespace
), ),
responseType = Map::class.java responseType = StoreMessageResponse::class.java
) )
} }
} }