fix: attempting to fix preventing message processing if group thread is not active for closed groups and initial contact dump only allows conversations with thread, may need further optimisations though

This commit is contained in:
0x330a 2023-06-16 17:40:19 +10:00
parent b5eaed3cbb
commit dce89a0f5f
4 changed files with 20 additions and 9 deletions

View File

@ -93,6 +93,7 @@ class SessionJobDatabase(context: Context, helper: SQLCipherOpenHelper) : Databa
fun cancelPendingMessageSendJobs(threadID: Long) { fun cancelPendingMessageSendJobs(threadID: Long) {
val database = databaseHelper.writableDatabase val database = databaseHelper.writableDatabase
val attachmentUploadJobKeys = mutableListOf<String>() val attachmentUploadJobKeys = mutableListOf<String>()
database.beginTransaction()
database.getAll(sessionJobTable, "$jobType = ?", arrayOf( AttachmentUploadJob.KEY )) { cursor -> database.getAll(sessionJobTable, "$jobType = ?", arrayOf( AttachmentUploadJob.KEY )) { cursor ->
val job = jobFromCursor(cursor) as AttachmentUploadJob? val job = jobFromCursor(cursor) as AttachmentUploadJob?
if (job != null && job.threadID == threadID.toString()) { attachmentUploadJobKeys.add(job.id!!) } if (job != null && job.threadID == threadID.toString()) { attachmentUploadJobKeys.add(job.id!!) }
@ -103,16 +104,20 @@ class SessionJobDatabase(context: Context, helper: SQLCipherOpenHelper) : Databa
if (job != null && job.message.threadID == threadID) { messageSendJobKeys.add(job.id!!) } if (job != null && job.message.threadID == threadID) { messageSendJobKeys.add(job.id!!) }
} }
if (attachmentUploadJobKeys.isNotEmpty()) { if (attachmentUploadJobKeys.isNotEmpty()) {
val attachmentUploadJobKeysAsString = attachmentUploadJobKeys.joinToString(", ") attachmentUploadJobKeys.forEach {
database.delete(sessionJobTable, "${Companion.jobType} = ? AND ${Companion.jobID} IN (?)", database.delete(sessionJobTable, "${Companion.jobType} = ? AND ${Companion.jobID} = ?",
arrayOf( AttachmentUploadJob.KEY, attachmentUploadJobKeysAsString )) arrayOf( AttachmentUploadJob.KEY, it ))
}
} }
if (messageSendJobKeys.isNotEmpty()) { if (messageSendJobKeys.isNotEmpty()) {
val messageSendJobKeysAsString = messageSendJobKeys.joinToString(", ") messageSendJobKeys.forEach {
database.delete(sessionJobTable, "${Companion.jobType} = ? AND ${Companion.jobID} IN (?)", database.delete(sessionJobTable, "${Companion.jobType} = ? AND ${Companion.jobID} = ?",
arrayOf( MessageSendJob.KEY, messageSendJobKeysAsString )) arrayOf( MessageSendJob.KEY, it ))
} }
} }
database.setTransactionSuccessful()
database.endTransaction()
}
fun isJobCanceled(job: Job): Boolean { fun isJobCanceled(job: Job): Boolean {
val database = databaseHelper.readableDatabase val database = databaseHelper.readableDatabase

View File

@ -148,6 +148,7 @@ object ConfigurationMessageUtilities {
val localUserKey = storage.getUserPublicKey() ?: return null val localUserKey = storage.getUserPublicKey() ?: return null
val contactsWithSettings = storage.getAllContacts().filter { recipient -> val contactsWithSettings = storage.getAllContacts().filter { recipient ->
recipient.sessionID != localUserKey && recipient.sessionID.startsWith(IdPrefix.STANDARD.value) recipient.sessionID != localUserKey && recipient.sessionID.startsWith(IdPrefix.STANDARD.value)
&& storage.getThreadId(recipient.sessionID) != null
}.map { contact -> }.map { contact ->
val address = Address.fromSerialized(contact.sessionID) val address = Address.fromSerialized(contact.sessionID)
val thread = storage.getThreadId(address) val thread = storage.getThreadId(address)

View File

@ -7,7 +7,6 @@ import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.runBlocking import kotlinx.coroutines.runBlocking
import nl.komponents.kovenant.Promise import nl.komponents.kovenant.Promise
import nl.komponents.kovenant.task import nl.komponents.kovenant.task
import org.session.libsession.database.StorageProtocol
import org.session.libsession.messaging.MessagingModuleConfiguration import org.session.libsession.messaging.MessagingModuleConfiguration
import org.session.libsession.messaging.messages.Message import org.session.libsession.messaging.messages.Message
import org.session.libsession.messaging.messages.control.CallMessage import org.session.libsession.messaging.messages.control.CallMessage
@ -103,12 +102,13 @@ class BatchMessageReceiveJob(
val context = MessagingModuleConfiguration.shared.context val context = MessagingModuleConfiguration.shared.context
val localUserPublicKey = storage.getUserPublicKey() val localUserPublicKey = storage.getUserPublicKey()
val serverPublicKey = openGroupID?.let { storage.getOpenGroupPublicKey(it.split(".").dropLast(1).joinToString(".")) } val serverPublicKey = openGroupID?.let { storage.getOpenGroupPublicKey(it.split(".").dropLast(1).joinToString(".")) }
val currentClosedGroups = storage.getAllActiveClosedGroupPublicKeys()
// parse and collect IDs // parse and collect IDs
messages.forEach { messageParameters -> messages.forEach { messageParameters ->
val (data, serverHash, openGroupMessageServerID) = messageParameters val (data, serverHash, openGroupMessageServerID) = messageParameters
try { try {
val (message, proto) = MessageReceiver.parse(data, openGroupMessageServerID, openGroupPublicKey = serverPublicKey) val (message, proto) = MessageReceiver.parse(data, openGroupMessageServerID, openGroupPublicKey = serverPublicKey, currentClosedGroups = currentClosedGroups)
message.serverHash = serverHash message.serverHash = serverHash
val parsedParams = ParsedMessage(messageParameters, message, proto) val parsedParams = ParsedMessage(messageParameters, message, proto)
val threadID = Message.getThreadId(message, openGroupID, storage, shouldCreateThread(parsedParams)) ?: NO_THREAD_MAPPING val threadID = Message.getThreadId(message, openGroupID, storage, shouldCreateThread(parsedParams)) ?: NO_THREAD_MAPPING

View File

@ -35,13 +35,14 @@ object MessageReceiver {
object NoThread: Error("Couldn't find thread for message.") object NoThread: Error("Couldn't find thread for message.")
object SelfSend: Error("Message addressed at self.") object SelfSend: Error("Message addressed at self.")
object InvalidGroupPublicKey: Error("Invalid group public key.") object InvalidGroupPublicKey: Error("Invalid group public key.")
object NoGroupThread: Error("No thread exists for this group.")
object NoGroupKeyPair: Error("Missing group key pair.") object NoGroupKeyPair: Error("Missing group key pair.")
object NoUserED25519KeyPair : Error("Couldn't find user ED25519 key pair.") object NoUserED25519KeyPair : Error("Couldn't find user ED25519 key pair.")
internal val isRetryable: Boolean = when (this) { internal val isRetryable: Boolean = when (this) {
is DuplicateMessage, is InvalidMessage, is UnknownMessage, is DuplicateMessage, is InvalidMessage, is UnknownMessage,
is UnknownEnvelopeType, is InvalidSignature, is NoData, is UnknownEnvelopeType, is InvalidSignature, is NoData,
is SenderBlocked, is SelfSend -> false is SenderBlocked, is SelfSend, is NoGroupThread -> false
else -> true else -> true
} }
} }
@ -52,6 +53,7 @@ object MessageReceiver {
isOutgoing: Boolean? = null, isOutgoing: Boolean? = null,
otherBlindedPublicKey: String? = null, otherBlindedPublicKey: String? = null,
openGroupPublicKey: String? = null, openGroupPublicKey: String? = null,
currentClosedGroups: Set<String>?
): Pair<Message, SignalServiceProtos.Content> { ): Pair<Message, SignalServiceProtos.Content> {
val storage = MessagingModuleConfiguration.shared.storage val storage = MessagingModuleConfiguration.shared.storage
val userPublicKey = storage.getUserPublicKey() val userPublicKey = storage.getUserPublicKey()
@ -172,6 +174,9 @@ object MessageReceiver {
// If the message failed to process the first time around we retry it later (if the error is retryable). In this case the timestamp // If the message failed to process the first time around we retry it later (if the error is retryable). In this case the timestamp
// will already be in the database but we don't want to treat the message as a duplicate. The isRetry flag is a simple workaround // will already be in the database but we don't want to treat the message as a duplicate. The isRetry flag is a simple workaround
// for this issue. // for this issue.
if (groupPublicKey != null && groupPublicKey !in (currentClosedGroups ?: emptySet())) {
throw Error.NoGroupThread
}
if ((message is ClosedGroupControlMessage && message.kind is ClosedGroupControlMessage.Kind.New) || message is SharedConfigurationMessage) { if ((message is ClosedGroupControlMessage && message.kind is ClosedGroupControlMessage.Kind.New) || message is SharedConfigurationMessage) {
// Allow duplicates in this case to avoid the following situation: // Allow duplicates in this case to avoid the following situation:
// • The app performed a background poll or received a push notification // • The app performed a background poll or received a push notification