From 82e3516a60d0d515c245fa862d7ceff3ca4fb5cf Mon Sep 17 00:00:00 2001 From: 0x330a <92654767+0x330a@users.noreply.github.com> Date: Wed, 13 Sep 2023 18:29:19 +1000 Subject: [PATCH] feat: basic polling happening with hashes merging --- .../securesms/ApplicationContext.java | 4 + .../securesms/database/Storage.kt | 10 ++- .../securesms/dependencies/DatabaseModule.kt | 9 +- .../securesms/dependencies/PollerFactory.kt | 41 +++++++++ .../dependencies/SessionUtilModule.kt | 17 ++++ .../pollers/ClosedGroupPoller.kt | 90 ++++++++++++++++--- 6 files changed, 156 insertions(+), 15 deletions(-) create mode 100644 app/src/main/java/org/thoughtcrime/securesms/dependencies/PollerFactory.kt diff --git a/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java b/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java index 6a65f6a6e2..17ce42660f 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java +++ b/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java @@ -65,6 +65,7 @@ import org.thoughtcrime.securesms.dependencies.AppComponent; import org.thoughtcrime.securesms.dependencies.ConfigFactory; import org.thoughtcrime.securesms.dependencies.DatabaseComponent; import org.thoughtcrime.securesms.dependencies.DatabaseModule; +import org.thoughtcrime.securesms.dependencies.PollerFactory; import org.thoughtcrime.securesms.emoji.EmojiSource; import org.thoughtcrime.securesms.groups.OpenGroupManager; import org.thoughtcrime.securesms.home.HomeActivity; @@ -149,6 +150,7 @@ public class ApplicationContext extends Application implements DefaultLifecycleO @Inject TextSecurePreferences textSecurePreferences; @Inject PushRegistry pushRegistry; @Inject ConfigFactory configFactory; + @Inject PollerFactory pollerFactory; CallMessageProcessor callMessageProcessor; MessagingModuleConfiguration messagingModuleConfiguration; @@ -289,6 +291,7 @@ public class ApplicationContext extends Application implements DefaultLifecycleO public void onTerminate() { stopKovenant(); // Loki OpenGroupManager.INSTANCE.stopPolling(); + pollerFactory.stopAll(); super.onTerminate(); } @@ -445,6 +448,7 @@ public class ApplicationContext extends Application implements DefaultLifecycleO if (poller != null) { poller.startIfNeeded(); } + pollerFactory.startAll(); LegacyClosedGroupPollerV2.getShared().start(); } diff --git a/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt b/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt index c5d212d2dd..b0d9c0235f 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt @@ -89,6 +89,7 @@ import org.thoughtcrime.securesms.database.model.MessageId import org.thoughtcrime.securesms.database.model.ReactionRecord import org.thoughtcrime.securesms.dependencies.ConfigFactory import org.thoughtcrime.securesms.dependencies.DatabaseComponent +import org.thoughtcrime.securesms.dependencies.PollerFactory import org.thoughtcrime.securesms.groups.ClosedGroupManager import org.thoughtcrime.securesms.groups.GroupManager import org.thoughtcrime.securesms.groups.OpenGroupManager @@ -99,7 +100,12 @@ import java.security.MessageDigest import network.loki.messenger.libsession_util.util.Contact as LibSessionContact import network.loki.messenger.libsession_util.util.GroupMember as LibSessionGroupMember -open class Storage(context: Context, helper: SQLCipherOpenHelper, private val configFactory: ConfigFactory) : Database(context, helper), StorageProtocol, +open class Storage( + context: Context, + helper: SQLCipherOpenHelper, + private val configFactory: ConfigFactory, + private val pollerFactory: PollerFactory +) : Database(context, helper), StorageProtocol, ThreadDatabase.ConversationThreadUpdateListener { override fun threadCreated(address: Address, threadId: Long) { @@ -617,6 +623,7 @@ open class Storage(context: Context, helper: SQLCipherOpenHelper, private val co setRecipientApproved(recipient, true) val threadId = getOrCreateThreadIdFor(recipient.address) setPinned(threadId, closedGroup.priority == PRIORITY_PINNED) + pollerFactory.pollerFor(closedGroup.groupSessionId)?.start() } for (group in lgc) { @@ -1003,6 +1010,7 @@ open class Storage(context: Context, helper: SQLCipherOpenHelper, private val co val groupRecipient = Recipient.from(context, Address.fromSerialized(newGroupRecipient), false) setRecipientApprovedMe(groupRecipient, true) setRecipientApproved(groupRecipient, true) + pollerFactory.updatePollers() return Optional.of(groupRecipient) } catch (e: Exception) { Log.e("Group Config", e) diff --git a/app/src/main/java/org/thoughtcrime/securesms/dependencies/DatabaseModule.kt b/app/src/main/java/org/thoughtcrime/securesms/dependencies/DatabaseModule.kt index 6580c5c927..5bef072c35 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/dependencies/DatabaseModule.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/dependencies/DatabaseModule.kt @@ -7,7 +7,6 @@ import dagger.hilt.InstallIn import dagger.hilt.android.qualifiers.ApplicationContext import dagger.hilt.components.SingletonComponent import org.session.libsession.database.MessageDataProvider -import org.session.libsession.database.StorageProtocol import org.thoughtcrime.securesms.attachments.DatabaseAttachmentProvider import org.thoughtcrime.securesms.crypto.AttachmentSecret import org.thoughtcrime.securesms.crypto.AttachmentSecretProvider @@ -132,8 +131,12 @@ object DatabaseModule { @Provides @Singleton - fun provideStorage(@ApplicationContext context: Context, openHelper: SQLCipherOpenHelper, configFactory: ConfigFactory, threadDatabase: ThreadDatabase): Storage { - val storage = Storage(context,openHelper, configFactory) + fun provideStorage(@ApplicationContext context: Context, + openHelper: SQLCipherOpenHelper, + configFactory: ConfigFactory, + threadDatabase: ThreadDatabase, + pollerFactory: PollerFactory): Storage { + val storage = Storage(context, openHelper, configFactory, pollerFactory) threadDatabase.setUpdateListener(storage) return storage } diff --git a/app/src/main/java/org/thoughtcrime/securesms/dependencies/PollerFactory.kt b/app/src/main/java/org/thoughtcrime/securesms/dependencies/PollerFactory.kt new file mode 100644 index 0000000000..5482743191 --- /dev/null +++ b/app/src/main/java/org/thoughtcrime/securesms/dependencies/PollerFactory.kt @@ -0,0 +1,41 @@ +package org.thoughtcrime.securesms.dependencies + +import kotlinx.coroutines.CoroutineScope +import org.session.libsession.messaging.sending_receiving.pollers.ClosedGroupPoller +import org.session.libsignal.utilities.SessionId +import java.util.concurrent.ConcurrentHashMap + +class PollerFactory(private val scope: CoroutineScope, private val configFactory: ConfigFactory) { + + private val pollers = ConcurrentHashMap() + + fun pollerFor(sessionId: SessionId): ClosedGroupPoller? { + val activeGroup = configFactory.userGroups?.getClosedGroup(sessionId.hexString()) ?: return null + // TODO: add check for active group being invited / approved etc + return pollers.getOrPut(sessionId) { + ClosedGroupPoller(scope, sessionId, configFactory) + } + } + + fun startAll() { + configFactory.userGroups?.allClosedGroupInfo()?.forEach { + pollerFor(it.groupSessionId)?.start() + } + } + + fun stopAll() { + pollers.forEach { (_, poller) -> + poller.stop() + } + } + + fun updatePollers() { + val currentGroups = configFactory.userGroups?.allClosedGroupInfo() ?: return + val toRemove = pollers.filter { (id, _) -> id !in currentGroups.map { it.groupSessionId } } + toRemove.forEach { (id, _) -> + pollers.remove(id)?.stop() + } + startAll() + } + +} \ No newline at end of file diff --git a/app/src/main/java/org/thoughtcrime/securesms/dependencies/SessionUtilModule.kt b/app/src/main/java/org/thoughtcrime/securesms/dependencies/SessionUtilModule.kt index cd4b071338..44b4386028 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/dependencies/SessionUtilModule.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/dependencies/SessionUtilModule.kt @@ -6,16 +6,23 @@ import dagger.Provides import dagger.hilt.InstallIn import dagger.hilt.android.qualifiers.ApplicationContext import dagger.hilt.components.SingletonComponent +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.plus import org.session.libsession.utilities.ConfigFactoryUpdateListener import org.session.libsession.utilities.TextSecurePreferences import org.thoughtcrime.securesms.crypto.KeyPairUtilities import org.thoughtcrime.securesms.database.ConfigDatabase +import javax.inject.Named import javax.inject.Singleton @Module @InstallIn(SingletonComponent::class) object SessionUtilModule { + const val POLLER_SCOPE = "poller_coroutine_scope" + private fun maybeUserEdSecretKey(context: Context): ByteArray? { val edKey = KeyPairUtilities.getUserED25519KeyPair(context) ?: return null return edKey.secretKey.asBytes @@ -33,4 +40,14 @@ object SessionUtilModule { registerListener(context as ConfigFactoryUpdateListener) } + @Provides + @Named(POLLER_SCOPE) + fun providePollerScope(@ApplicationContext applicationContext: Context) = + GlobalScope + SupervisorJob() + + @Provides + @Singleton + fun providePollerFactory(@Named(POLLER_SCOPE) coroutineScope: CoroutineScope, + configFactory: ConfigFactory) = PollerFactory(coroutineScope, configFactory) + } \ No newline at end of file diff --git a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/ClosedGroupPoller.kt b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/ClosedGroupPoller.kt index 8397261c9b..45459c24e5 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/ClosedGroupPoller.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/ClosedGroupPoller.kt @@ -1,6 +1,7 @@ package org.session.libsession.messaging.sending_receiving.pollers import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Job import kotlinx.coroutines.delay import kotlinx.coroutines.launch @@ -11,6 +12,7 @@ import network.loki.messenger.libsession_util.util.GroupInfo import org.session.libsession.snode.RawResponse import org.session.libsession.snode.SnodeAPI import org.session.libsession.utilities.ConfigFactoryProtocol +import org.session.libsignal.utilities.Base64 import org.session.libsignal.utilities.Log import org.session.libsignal.utilities.Namespace import org.session.libsignal.utilities.SessionId @@ -19,6 +21,32 @@ class ClosedGroupPoller(private val executor: CoroutineScope, private val closedGroupSessionId: SessionId, private val configFactoryProtocol: ConfigFactoryProtocol) { + data class ParsedRawMessage( + val data: ByteArray, + val hash: String, + val timestamp: Long + ) { + override fun equals(other: Any?): Boolean { + if (this === other) return true + if (javaClass != other?.javaClass) return false + + other as ParsedRawMessage + + if (!data.contentEquals(other.data)) return false + if (hash != other.hash) return false + if (timestamp != other.timestamp) return false + + return true + } + + override fun hashCode(): Int { + var result = data.contentHashCode() + result = 31 * result + hash.hashCode() + result = 31 * result + timestamp.hashCode() + return result + } + } + companion object { const val POLL_INTERVAL = 3_000L } @@ -27,10 +55,14 @@ class ClosedGroupPoller(private val executor: CoroutineScope, private var job: Job? = null fun start() { + if (isRunning) return // already started, don't restart + + Log.d("ClosedGroupPoller", "Starting closed group poller for ${closedGroupSessionId.hexString().take(4)}") job?.cancel() - job = executor.launch { + job = executor.launch(Dispatchers.IO) { val closedGroups = configFactoryProtocol.userGroups?: return@launch - while (true) { + isRunning = true + while (isRunning) { val group = closedGroups.getClosedGroup(closedGroupSessionId.hexString()) ?: break val nextPoll = poll(group) if (nextPoll != null) { @@ -45,7 +77,9 @@ class ClosedGroupPoller(private val executor: CoroutineScope, } fun stop() { + isRunning = false job?.cancel() + job = null } fun poll(group: GroupInfo.ClosedGroupInfo): Long? { @@ -97,15 +131,21 @@ class ClosedGroupPoller(private val executor: CoroutineScope, // TODO: add the extend duration TTLs for known hashes here - (pollResult["body"] as List).forEachIndexed { index, response -> + // if poll result body is null here we don't have any things ig + (pollResult["results"] as List).forEachIndexed { index, response -> when (index) { keysIndex -> handleKeyPoll(response, keys, info, members) infoIndex -> handleInfo(response, info) membersIndex -> handleMembers(response, members) - messageIndex -> handleMessages(response) + messageIndex -> handleMessages(response, keys) } } + configFactoryProtocol.saveGroupConfigs(keys, info, members) + keys.free() + info.free() + members.free() + } catch (e: Exception) { Log.e("GroupPoller", "Polling failed for group", e) return POLL_INTERVAL @@ -113,27 +153,55 @@ class ClosedGroupPoller(private val executor: CoroutineScope, return POLL_INTERVAL // this might change in future } + private fun parseMessages(response: RawResponse): List { + val body = response["body"] as? RawResponse + if (body == null) { + Log.e("GroupPoller", "Batch parse messages contained no body!") + return emptyList() + } + val messages = body["messages"] as? List<*> ?: return emptyList() + return messages.mapNotNull { messageMap -> + val rawMessageAsJSON = messageMap as? Map<*, *> ?: return@mapNotNull null + val base64EncodedData = rawMessageAsJSON["data"] as? String ?: return@mapNotNull null + val hash = rawMessageAsJSON["hash"] as? String ?: return@mapNotNull null + val timestamp = rawMessageAsJSON["timestamp"] as? Long ?: return@mapNotNull null + val data = base64EncodedData.let { Base64.decode(it) } + ParsedRawMessage(data, hash, timestamp) + } + } + private fun handleKeyPoll(response: RawResponse, keysConfig: GroupKeysConfig, infoConfig: GroupInfoConfig, membersConfig: GroupMembersConfig) { - + // get all the data to hash objects and process them + parseMessages(response).forEach { (message, hash, timestamp) -> + keysConfig.loadKey(message, hash, timestamp, infoConfig, membersConfig) + Log.d("ClosedGroupPoller", "Merged $hash for keys") + } } private fun handleInfo(response: RawResponse, infoConfig: GroupInfoConfig) { - + parseMessages(response).forEach { (message, hash, _) -> + infoConfig.merge(hash to message) + Log.d("ClosedGroupPoller", "Merged $hash for info") + } } private fun handleMembers(response: RawResponse, membersConfig: GroupMembersConfig) { - + parseMessages(response).forEach { (message, hash, _) -> + membersConfig.merge(hash to message) + Log.d("ClosedGroupPoller", "Merged $hash for members") + } } - private fun handleMessages(response: RawResponse) { - // TODO + private fun handleMessages(response: RawResponse, keysConfig: GroupKeysConfig) { + val messages = parseMessages(response) + if (messages.isNotEmpty()) { + // TODO: process decrypting bundles + } } - - } \ No newline at end of file