From 4a2289646e10d5cdb3f8a9aeecad654f3292c514 Mon Sep 17 00:00:00 2001 From: Morgan Pretty Date: Wed, 7 Jun 2023 17:10:16 +1000 Subject: [PATCH] Updated the code to ignore messages invalidated by the config --- .../securesms/database/Storage.kt | 9 ++ .../securesms/database/ThreadDatabase.java | 18 +++ .../securesms/dependencies/ConfigFactory.kt | 41 ++++++ .../libsession/database/StorageProtocol.kt | 2 + .../messaging/jobs/BatchMessageReceiveJob.kt | 15 +-- .../messaging/jobs/MessageReceiveJob.kt | 4 +- .../libsession/messaging/messages/Message.kt | 14 ++ .../ReceivedMessageHandler.kt | 123 ++++++++++++++++-- .../pollers/OpenGroupPoller.kt | 4 +- .../utilities/ConfigFactoryProtocol.kt | 2 + 10 files changed, 208 insertions(+), 24 deletions(-) diff --git a/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt b/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt index b2eea48cfc..6d6ca69670 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt @@ -419,6 +419,10 @@ open class Storage(context: Context, helper: SQLCipherOpenHelper, private val co notifyUpdates(forConfigObject) } + override fun conversationInConfig(publicKey: String?, groupPublicKey: String?, openGroupId: String?, visibleOnly: Boolean): Boolean { + return configFactory.conversationInConfig(publicKey, groupPublicKey, openGroupId, visibleOnly) + } + override fun canPerformConfigChange(variant: String, publicKey: String, changeTimestampMs: Long): Boolean { return configFactory.canPerformChange(variant, publicKey, changeTimestampMs) } @@ -1217,6 +1221,11 @@ open class Storage(context: Context, helper: SQLCipherOpenHelper, private val co recipientDb.setRecipientHash(recipient, recipientHash) } + override fun getThreadArchived(threadId: Long): Boolean { + val threadDB = DatabaseComponent.get(context).threadDatabase() + return threadDB.getThreadArchived(threadId) + } + override fun getLastUpdated(threadID: Long): Long { val threadDB = DatabaseComponent.get(context).threadDatabase() return threadDB.getLastUpdated(threadID) diff --git a/app/src/main/java/org/thoughtcrime/securesms/database/ThreadDatabase.java b/app/src/main/java/org/thoughtcrime/securesms/database/ThreadDatabase.java index 68b496fb5e..f440ecb644 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/database/ThreadDatabase.java +++ b/app/src/main/java/org/thoughtcrime/securesms/database/ThreadDatabase.java @@ -658,6 +658,24 @@ public class ThreadDatabase extends Database { return getOrCreateThreadIdFor(recipient, DistributionTypes.DEFAULT); } + public boolean getThreadArchived(long threadId) { + SQLiteDatabase db = databaseHelper.getReadableDatabase(); + Cursor cursor = null; + + try { + cursor = db.query(TABLE_NAME, null, ID + " = ?", new String[] {threadId+""}, null, null, null); + + if (cursor != null && cursor.moveToFirst()) { + return (cursor.getInt(cursor.getColumnIndexOrThrow(ARCHIVED)) == 1); + } + } finally { + if (cursor != null) + cursor.close(); + } + + return false; + } + public void setThreadArchived(long threadId) { ContentValues contentValues = new ContentValues(1); contentValues.put(ARCHIVED, 1); diff --git a/app/src/main/java/org/thoughtcrime/securesms/dependencies/ConfigFactory.kt b/app/src/main/java/org/thoughtcrime/securesms/dependencies/ConfigFactory.kt index 7131a8adf7..52e24aab3f 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/dependencies/ConfigFactory.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/dependencies/ConfigFactory.kt @@ -13,6 +13,8 @@ import org.session.libsession.utilities.TextSecurePreferences import org.session.libsignal.protos.SignalServiceProtos.SharedConfigMessage import org.session.libsignal.utilities.Log import org.thoughtcrime.securesms.database.ConfigDatabase +import org.thoughtcrime.securesms.dependencies.DatabaseComponent.Companion.get +import org.thoughtcrime.securesms.groups.GroupManager import org.thoughtcrime.securesms.util.ConfigurationMessageUtilities class ConfigFactory( @@ -189,6 +191,45 @@ class ConfigFactory( } } + override fun conversationInConfig( + publicKey: String?, + groupPublicKey: String?, + openGroupId: String?, + visibleOnly: Boolean + ): Boolean { + if (!ConfigBase.isNewConfigEnabled(isConfigForcedOn, SnodeAPI.nowWithOffset)) return true + + val (_, userPublicKey) = maybeGetUserInfo() ?: return true + + if (openGroupId != null) { + val userGroups = userGroups ?: return false + val threadId = GroupManager.getOpenGroupThreadID(openGroupId, context) + val openGroup = get(context).lokiThreadDatabase().getOpenGroupChat(threadId) ?: return false + + // Not handling the `hidden` behaviour for communities so just indicate the existence + return (userGroups.getCommunityInfo(openGroup.server, openGroup.room) != null) + } + else if (groupPublicKey != null) { + val userGroups = userGroups ?: return false + + // Not handling the `hidden` behaviour for legacy groups so just indicate the existence + return (userGroups.getLegacyGroupInfo(groupPublicKey) != null) + } + else if (publicKey == userPublicKey) { + val user = user ?: return false + + return (!visibleOnly || user.getNtsPriority() != ConfigBase.PRIORITY_HIDDEN) + } + else if (publicKey != null) { + val contacts = contacts ?: return false + val targetContact = contacts.get(publicKey) ?: return false + + return (!visibleOnly || targetContact.priority != ConfigBase.PRIORITY_HIDDEN) + } + + return false + } + override fun canPerformChange(variant: String, publicKey: String, changeTimestampMs: Long): Boolean { if (!ConfigBase.isNewConfigEnabled(isConfigForcedOn, SnodeAPI.nowWithOffset)) return true diff --git a/libsession/src/main/java/org/session/libsession/database/StorageProtocol.kt b/libsession/src/main/java/org/session/libsession/database/StorageProtocol.kt index eb5779433f..722ad54a3f 100644 --- a/libsession/src/main/java/org/session/libsession/database/StorageProtocol.kt +++ b/libsession/src/main/java/org/session/libsession/database/StorageProtocol.kt @@ -166,6 +166,7 @@ interface StorageProtocol { fun getThreadId(address: Address): Long? fun getThreadId(recipient: Recipient): Long? fun getThreadIdForMms(mmsId: Long): Long + fun getThreadArchived(threadId: Long): Boolean fun getLastUpdated(threadID: Long): Long fun trimThread(threadID: Long, threadLimit: Int) fun trimThreadBefore(threadID: Long, timestamp: Long) @@ -224,5 +225,6 @@ interface StorageProtocol { // Shared configs fun notifyConfigUpdates(forConfigObject: ConfigBase) + fun conversationInConfig(publicKey: String?, groupPublicKey: String?, openGroupId: String?, visibleOnly: Boolean): Boolean fun canPerformConfigChange(variant: String, publicKey: String, changeTimestampMs: Long): Boolean } diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/BatchMessageReceiveJob.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/BatchMessageReceiveJob.kt index 5bac4383f5..fc293724e9 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/BatchMessageReceiveJob.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/BatchMessageReceiveJob.kt @@ -92,15 +92,6 @@ class BatchMessageReceiveJob( } } - private fun getThreadId(message: Message, storage: StorageProtocol, shouldCreateThread: Boolean): Long? { - val senderOrSync = when (message) { - is VisibleMessage -> message.syncTarget ?: message.sender!! - is ExpirationTimerUpdate -> message.syncTarget ?: message.sender!! - else -> message.sender!! - } - return storage.getThreadIdFor(senderOrSync, message.groupPublicKey, openGroupID, createThread = shouldCreateThread) - } - override suspend fun execute(dispatcherName: String) { executeAsync(dispatcherName).get() } @@ -120,7 +111,7 @@ class BatchMessageReceiveJob( val (message, proto) = MessageReceiver.parse(data, openGroupMessageServerID, openGroupPublicKey = serverPublicKey) message.serverHash = serverHash val parsedParams = ParsedMessage(messageParameters, message, proto) - val threadID = getThreadId(message, storage, shouldCreateThread(parsedParams)) ?: NO_THREAD_MAPPING + val threadID = Message.getThreadId(message, openGroupID, storage, shouldCreateThread(parsedParams)) ?: NO_THREAD_MAPPING if (!threadMap.containsKey(threadID)) { threadMap[threadID] = mutableListOf(parsedParams) } else { @@ -179,7 +170,7 @@ class BatchMessageReceiveJob( } } val messageId = MessageReceiver.handleVisibleMessage( - message, proto, openGroupID, + message, proto, openGroupID, threadId, runThreadUpdate = false, runProfileUpdate = true ) @@ -209,7 +200,7 @@ class BatchMessageReceiveJob( } } - else -> MessageReceiver.handle(message, proto, openGroupID) + else -> MessageReceiver.handle(message, proto, threadId, openGroupID) } } catch (e: Exception) { Log.e(TAG, "Couldn't process message (id: $id)", e) diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageReceiveJob.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageReceiveJob.kt index f3c4e70037..f07720e09f 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageReceiveJob.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageReceiveJob.kt @@ -3,6 +3,7 @@ package org.session.libsession.messaging.jobs import nl.komponents.kovenant.Promise import nl.komponents.kovenant.deferred import org.session.libsession.messaging.MessagingModuleConfiguration +import org.session.libsession.messaging.messages.Message import org.session.libsession.messaging.sending_receiving.MessageReceiver import org.session.libsession.messaging.sending_receiving.handle import org.session.libsession.messaging.utilities.Data @@ -37,8 +38,9 @@ class MessageReceiveJob(val data: ByteArray, val serverHash: String? = null, val MessagingModuleConfiguration.shared.storage.getOpenGroupPublicKey(it.split(".").dropLast(1).joinToString(".")) } val (message, proto) = MessageReceiver.parse(this.data, this.openGroupMessageServerID, openGroupPublicKey = serverPublicKey) + val threadId = Message.getThreadId(message, this.openGroupID, MessagingModuleConfiguration.shared.storage, false) message.serverHash = serverHash - MessageReceiver.handle(message, proto, this.openGroupID) + MessageReceiver.handle(message, proto, threadId ?: -1, this.openGroupID) this.handleSuccess(dispatcherName) deferred.resolve(Unit) } catch (e: Exception) { diff --git a/libsession/src/main/java/org/session/libsession/messaging/messages/Message.kt b/libsession/src/main/java/org/session/libsession/messaging/messages/Message.kt index db2a6f5c5a..dd1d5f1852 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/messages/Message.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/messages/Message.kt @@ -1,6 +1,9 @@ package org.session.libsession.messaging.messages import com.google.protobuf.ByteString +import org.session.libsession.database.StorageProtocol +import org.session.libsession.messaging.messages.control.ExpirationTimerUpdate +import org.session.libsession.messaging.messages.visible.VisibleMessage import org.session.libsession.utilities.GroupUtil import org.session.libsignal.protos.SignalServiceProtos @@ -19,6 +22,17 @@ abstract class Message { open val ttl: Long = 14 * 24 * 60 * 60 * 1000 open val isSelfSendValid: Boolean = false + companion object { + fun getThreadId(message: Message, openGroupID: String?, storage: StorageProtocol, shouldCreateThread: Boolean): Long? { + val senderOrSync = when (message) { + is VisibleMessage -> message.syncTarget ?: message.sender!! + is ExpirationTimerUpdate -> message.syncTarget ?: message.sender!! + else -> message.sender!! + } + return storage.getThreadIdFor(senderOrSync, message.groupPublicKey, openGroupID, createThread = shouldCreateThread) + } + } + open fun isValid(): Boolean { val sentTimestamp = sentTimestamp if (sentTimestamp != null && sentTimestamp <= 0) { return false } diff --git a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/ReceivedMessageHandler.kt b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/ReceivedMessageHandler.kt index 0abf354b72..8143a8baa8 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/ReceivedMessageHandler.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/ReceivedMessageHandler.kt @@ -60,22 +60,22 @@ internal fun MessageReceiver.isBlocked(publicKey: String): Boolean { return recipient.isBlocked } -fun MessageReceiver.handle(message: Message, proto: SignalServiceProtos.Content, openGroupID: String?) { +fun MessageReceiver.handle(message: Message, proto: SignalServiceProtos.Content, threadId: Long, openGroupID: String?) { when (message) { is ReadReceipt -> handleReadReceipt(message) is TypingIndicator -> handleTypingIndicator(message) is ClosedGroupControlMessage -> handleClosedGroupControlMessage(message) is ExpirationTimerUpdate -> handleExpirationTimerUpdate(message) - is DataExtractionNotification -> handleDataExtractionNotification(message) + is DataExtractionNotification -> handleDataExtractionNotification(message, threadId) is ConfigurationMessage -> handleConfigurationMessage(message) is UnsendRequest -> handleUnsendRequest(message) - is MessageRequestResponse -> handleMessageRequestResponse(message) + is MessageRequestResponse -> handleMessageRequestResponse(message, threadId) is VisibleMessage -> handleVisibleMessage( - message, proto, openGroupID, + message, proto, openGroupID, threadId, runThreadUpdate = true, runProfileUpdate = true ) - is CallMessage -> handleCallMessage(message) + is CallMessage -> handleCallMessage(message, threadId) } } @@ -85,7 +85,33 @@ private fun MessageReceiver.handleReadReceipt(message: ReadReceipt) { SSKEnvironment.shared.readReceiptManager.processReadReceipts(context, message.sender!!, message.timestamps!!, message.receivedTimestamp!!) } -private fun MessageReceiver.handleCallMessage(message: CallMessage) { +private fun MessageReceiver.handleCallMessage(message: CallMessage, threadId: Long) { + // Only process the message if the thread is not archived or it was sent after the libSession buffer period + val storage = MessagingModuleConfiguration.shared.storage + val userPublicKey = storage.getUserPublicKey()!! + val recipient = storage.getRecipientForThread(threadId) + val dbThreadIsVisible = ( + threadId > 0 && + recipient != null && + !recipient.isContactRecipient && + !storage.getThreadArchived(threadId) + ) + + if ( + !dbThreadIsVisible && + !storage.conversationInConfig( + recipient?.address?.serialize(), + null, + null, + true + ) && + !storage.canPerformConfigChange( + SharedConfigMessage.Kind.CONTACTS.name, + userPublicKey, + message.sentTimestamp!! + ) + ) { return } + // TODO: refactor this out to persistence, just to help debug the flow and send/receive in synchronous testing WebRtcUtils.SIGNAL_QUEUE.trySend(message) } @@ -126,11 +152,37 @@ private fun MessageReceiver.handleExpirationTimerUpdate(message: ExpirationTimer } } -private fun MessageReceiver.handleDataExtractionNotification(message: DataExtractionNotification) { +private fun MessageReceiver.handleDataExtractionNotification(message: DataExtractionNotification, threadId: Long) { // We don't handle data extraction messages for groups (they shouldn't be sent, but just in case we filter them here too) if (message.groupPublicKey != null) return val storage = MessagingModuleConfiguration.shared.storage val senderPublicKey = message.sender!! + + // Only process the message if the thread is not archived or it was sent after the libSession buffer period + val userPublicKey = storage.getUserPublicKey()!! + val recipient = storage.getRecipientForThread(threadId) + val dbThreadIsVisible = ( + threadId > 0 && + recipient != null && + !recipient.isContactRecipient && + !storage.getThreadArchived(threadId) + ) + + if ( + !dbThreadIsVisible && + !storage.conversationInConfig( + recipient?.address?.serialize(), + null, + null, + true + ) && + !storage.canPerformConfigChange( + if (recipient?.address?.serialize() == userPublicKey) SharedConfigMessage.Kind.USER_PROFILE.name else SharedConfigMessage.Kind.CONTACTS.name, + userPublicKey, + message.sentTimestamp!! + ) + ) { return } + val notification: DataExtractionNotificationInfoMessage = when(message.kind) { is DataExtractionNotification.Kind.Screenshot -> DataExtractionNotificationInfoMessage(DataExtractionNotificationInfoMessage.Kind.SCREENSHOT) is DataExtractionNotification.Kind.MediaSaved -> DataExtractionNotificationInfoMessage(DataExtractionNotificationInfoMessage.Kind.MEDIA_SAVED) @@ -215,7 +267,33 @@ fun MessageReceiver.handleUnsendRequest(message: UnsendRequest): Long? { return deletedMessageId } -fun handleMessageRequestResponse(message: MessageRequestResponse) { +fun handleMessageRequestResponse(message: MessageRequestResponse, threadId: Long) { + // Only process the message if the thread is not archived or it was sent after the libSession buffer period + val storage = MessagingModuleConfiguration.shared.storage + val userPublicKey = storage.getUserPublicKey()!! + val recipient = storage.getRecipientForThread(threadId) + val dbThreadIsVisible = ( + threadId > 0 && + recipient != null && + !recipient.isContactRecipient && + !storage.getThreadArchived(threadId) + ) + + if ( + !dbThreadIsVisible && + !storage.conversationInConfig( + recipient?.address?.serialize(), + null, + null, + true + ) && + !storage.canPerformConfigChange( + SharedConfigMessage.Kind.CONTACTS.name, + userPublicKey, + message.sentTimestamp!! + ) + ) { return } + MessagingModuleConfiguration.shared.storage.insertMessageRequestResponse(message) } //endregion @@ -224,20 +302,45 @@ fun MessageReceiver.handleVisibleMessage( message: VisibleMessage, proto: SignalServiceProtos.Content, openGroupID: String?, + threadId: Long, runThreadUpdate: Boolean, runProfileUpdate: Boolean ): Long? { val storage = MessagingModuleConfiguration.shared.storage val context = MessagingModuleConfiguration.shared.context - val userPublicKey = storage.getUserPublicKey() + val userPublicKey = storage.getUserPublicKey()!! val messageSender: String? = message.sender + + // Only process the message if the thread is not archived or it was sent after the libSession buffer period + val threadRecipient = storage.getRecipientForThread(threadId) + val dbThreadIsVisible = ( + threadId > 0 && + threadRecipient != null && + !threadRecipient.isContactRecipient && + !storage.getThreadArchived(threadId) + ) + + if ( + !dbThreadIsVisible && + !storage.conversationInConfig( + if (message.groupPublicKey == null) threadRecipient?.address?.serialize() else null, + message.groupPublicKey, + openGroupID, + true + ) && + !storage.canPerformConfigChange( + if (threadRecipient?.address?.serialize() == userPublicKey) SharedConfigMessage.Kind.USER_PROFILE.name else SharedConfigMessage.Kind.CONTACTS.name, + userPublicKey, + message.sentTimestamp!! + ) + ) { return null } + // Get or create thread // FIXME: In case this is an open group this actually * doesn't * create the thread if it doesn't yet // exist. This is intentional, but it's very non-obvious. val threadID = storage.getThreadIdFor(message.syncTarget ?: messageSender!!, message.groupPublicKey, openGroupID, createThread = true) // Thread doesn't exist; should only be reached in a case where we are processing open group messages for a no longer existent thread ?: throw MessageReceiver.Error.NoThread - val threadRecipient = storage.getRecipientForThread(threadID) val userBlindedKey = openGroupID?.let { val openGroup = storage.getOpenGroup(threadID) ?: return@let null val blindedKey = SodiumUtilities.blindedKeyPair(openGroup.publicKey, MessagingModuleConfiguration.shared.getUserED25519KeyPair()!!) ?: return@let null diff --git a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPoller.kt b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPoller.kt index b9ce8da6ac..3a14a00734 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPoller.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPoller.kt @@ -12,6 +12,7 @@ import org.session.libsession.messaging.jobs.MessageReceiveJob import org.session.libsession.messaging.jobs.MessageReceiveParameters import org.session.libsession.messaging.jobs.OpenGroupDeleteJob import org.session.libsession.messaging.jobs.TrimThreadJob +import org.session.libsession.messaging.messages.Message import org.session.libsession.messaging.messages.control.ExpirationTimerUpdate import org.session.libsession.messaging.messages.visible.VisibleMessage import org.session.libsession.messaging.open_groups.Endpoint @@ -261,7 +262,8 @@ class OpenGroupPoller(private val server: String, private val executorService: S } mappingCache[it.recipient] = mapping } - MessageReceiver.handle(message, proto, null) + val threadId = Message.getThreadId(message, null, MessagingModuleConfiguration.shared.storage, false) + MessageReceiver.handle(message, proto, threadId ?: -1, null) } catch (e: Exception) { Log.e("Loki", "Couldn't handle direct message", e) } diff --git a/libsession/src/main/java/org/session/libsession/utilities/ConfigFactoryProtocol.kt b/libsession/src/main/java/org/session/libsession/utilities/ConfigFactoryProtocol.kt index cad4141186..a45fcd6761 100644 --- a/libsession/src/main/java/org/session/libsession/utilities/ConfigFactoryProtocol.kt +++ b/libsession/src/main/java/org/session/libsession/utilities/ConfigFactoryProtocol.kt @@ -13,6 +13,8 @@ interface ConfigFactoryProtocol { val userGroups: UserGroupsConfig? fun getUserConfigs(): List fun persist(forConfigObject: ConfigBase, timestamp: Long) + + fun conversationInConfig(publicKey: String?, groupPublicKey: String?, openGroupId: String?, visibleOnly: Boolean): Boolean fun canPerformChange(variant: String, publicKey: String, changeTimestampMs: Long): Boolean }