fix: fix up the duplicate thread creation in the message receive handler

This commit is contained in:
0x330a
2023-04-13 01:31:25 +10:00
parent 6d37fb29ab
commit 2ebd6ebf64
11 changed files with 108 additions and 75 deletions

View File

@@ -155,7 +155,7 @@ interface StorageProtocol {
// Thread
fun getOrCreateThreadIdFor(address: Address): Long
fun getOrCreateThreadIdFor(publicKey: String, groupPublicKey: String?, openGroupID: String?): Long
fun getThreadIdFor(publicKey: String, groupPublicKey: String?, openGroupID: String?, createThread: Boolean): Long?
fun getThreadId(publicKeyOrOpenGroupID: String): Long?
fun getThreadId(openGroup: OpenGroup): Long?
fun getThreadId(address: Address): Long?

View File

@@ -10,6 +10,7 @@ import nl.komponents.kovenant.task
import org.session.libsession.database.StorageProtocol
import org.session.libsession.messaging.MessagingModuleConfiguration
import org.session.libsession.messaging.messages.Message
import org.session.libsession.messaging.messages.control.ClosedGroupControlMessage
import org.session.libsession.messaging.messages.control.ExpirationTimerUpdate
import org.session.libsession.messaging.messages.control.UnsendRequest
import org.session.libsession.messaging.messages.visible.ParsedMessage
@@ -61,13 +62,29 @@ class BatchMessageReceiveJob(
private val OPEN_GROUP_ID_KEY = "open_group_id"
}
private fun getThreadId(message: Message, storage: StorageProtocol): Long {
private fun shouldCreateThread(parsedMessage: ParsedMessage): Boolean {
val message = parsedMessage.message
return message is VisibleMessage
|| !message.isSenderSelf
// TODO: sort out which messages should create threads: message requests? group creation threads? visible messages? others? calls?
// || (message is ClosedGroupControlMessage && message.kind is ClosedGroupControlMessage.Kind.New) // this was creating threads for self send messages (i.e. synced group creation)
// if (message is ClosedGroupControlMessage && message.kind is ClosedGroupControlMessage.Kind.New) {
// return true
// }
// if (parsedMessage.message.isSenderSelf ) {
// // all the cases where we should add a self send creating the thread, i.e. group invite? visible message?
// }
// !parsedMessage.message.isSenderSelf || parsedMessage.message is VisibleMessage
}
private fun getThreadId(message: Message, storage: StorageProtocol, shouldCreateThread: Boolean): Long? {
val senderOrSync = when (message) {
is VisibleMessage -> message.syncTarget ?: message.sender!!
is ExpirationTimerUpdate -> message.syncTarget ?: message.sender!!
else -> message.sender!!
}
return storage.getOrCreateThreadIdFor(senderOrSync, message.groupPublicKey, openGroupID)
return storage.getThreadIdFor(senderOrSync, message.groupPublicKey, openGroupID, createThread = shouldCreateThread)
}
override suspend fun execute(dispatcherName: String) {
@@ -88,9 +105,9 @@ class BatchMessageReceiveJob(
try {
val (message, proto) = MessageReceiver.parse(data, openGroupMessageServerID, openGroupPublicKey = serverPublicKey)
message.serverHash = serverHash
val threadID = getThreadId(message, storage)
val parsedParams = ParsedMessage(messageParameters, message, proto)
if (!threadMap.containsKey(threadID)) {
val threadID = getThreadId(message, storage, shouldCreateThread(parsedParams))
if (threadID != null && !threadMap.containsKey(threadID)) {
threadMap[threadID] = mutableListOf(parsedParams)
} else {
threadMap[threadID]!! += parsedParams

View File

@@ -11,6 +11,7 @@ abstract class Message {
var receivedTimestamp: Long? = null
var recipient: String? = null
var sender: String? = null
var isSenderSelf: Boolean = false
var groupPublicKey: String? = null
var openGroupServerMessageID: Long? = null
var serverHash: String? = null

View File

@@ -149,6 +149,9 @@ object MessageReceiver {
if (!message.isSelfSendValid && (sender == userPublicKey || isUserBlindedSender)) {
throw Error.SelfSend
}
if (sender == userPublicKey || isUserBlindedSender) {
message.isSenderSelf = true
}
// Guard against control messages in open groups
if (isOpenGroupMessage && message !is VisibleMessage) {
throw Error.InvalidMessage

View File

@@ -440,7 +440,7 @@ object MessageSender {
}
fun sendNonDurably(message: Message, address: Address): Promise<Unit, Exception> {
val threadID = MessagingModuleConfiguration.shared.storage.getOrCreateThreadIdFor(address)
val threadID = MessagingModuleConfiguration.shared.storage.getThreadId(address)
message.threadID = threadID
val destination = Destination.from(address)
return send(message, destination)

View File

@@ -232,11 +232,9 @@ fun MessageReceiver.handleVisibleMessage(
// Get or create thread
// FIXME: In case this is an open group this actually * doesn't * create the thread if it doesn't yet
// exist. This is intentional, but it's very non-obvious.
val threadID = storage.getOrCreateThreadIdFor(message.syncTarget ?: messageSender!!, message.groupPublicKey, openGroupID)
if (threadID < 0) {
val threadID = storage.getThreadIdFor(message.syncTarget ?: messageSender!!, message.groupPublicKey, openGroupID, createThread = true)
// Thread doesn't exist; should only be reached in a case where we are processing open group messages for a no longer existent thread
throw MessageReceiver.Error.NoThread
}
?: throw MessageReceiver.Error.NoThread
val threadRecipient = storage.getRecipientForThread(threadID)
val userBlindedKey = openGroupID?.let {
val openGroup = storage.getOpenGroup(threadID) ?: return@let null