From 621e564023782e5682365541624619e202dad3a1 Mon Sep 17 00:00:00 2001 From: 0x330a <92654767+0x330a@users.noreply.github.com> Date: Tue, 21 Feb 2023 13:26:19 +1100 Subject: [PATCH] feat: batching poll responses properly and handling groups properly --- .../securesms/database/Storage.kt | 2 +- .../securesms/home/HomeActivity.kt | 7 +-- .../util/ConfigurationMessageUtilities.kt | 3 +- .../messaging/jobs/ConfigurationSyncJob.kt | 21 +++++--- .../sending_receiving/pollers/Poller.kt | 48 ++++++++++++------- 5 files changed, 54 insertions(+), 27 deletions(-) diff --git a/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt b/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt index 9b80618115..1e6395cbac 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt @@ -133,7 +133,7 @@ class Storage(context: Context, helper: SQLCipherOpenHelper, private val configF configFactory.convoVolatile?.let { config -> val convo = when { // recipient closed group - recipient.isClosedGroupRecipient -> config.getOrConstructLegacyClosedGroup(recipient.address.serialize()) + recipient.isClosedGroupRecipient -> config.getOrConstructLegacyClosedGroup(GroupUtil.doubleDecodeGroupId(recipient.address.serialize())) // recipient is open group recipient.isOpenGroupRecipient -> { val openGroupJoinUrl = getOpenGroup(threadId)?.joinURL ?: return diff --git a/app/src/main/java/org/thoughtcrime/securesms/home/HomeActivity.kt b/app/src/main/java/org/thoughtcrime/securesms/home/HomeActivity.kt index c98db73a74..e505702a2f 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/home/HomeActivity.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/home/HomeActivity.kt @@ -1,11 +1,11 @@ package org.thoughtcrime.securesms.home import android.content.BroadcastReceiver +import android.content.ClipData +import android.content.ClipboardManager import android.content.Context import android.content.Intent import android.content.IntentFilter -import android.content.ClipData -import android.content.ClipboardManager import android.os.Bundle import android.text.SpannableString import android.widget.Toast @@ -33,6 +33,7 @@ import org.greenrobot.eventbus.ThreadMode import org.session.libsession.messaging.MessagingModuleConfiguration import org.session.libsession.messaging.jobs.JobQueue import org.session.libsession.messaging.sending_receiving.MessageSender +import org.session.libsession.snode.SnodeAPI import org.session.libsession.utilities.Address import org.session.libsession.utilities.GroupUtil import org.session.libsession.utilities.ProfilePictureModifiedEvent @@ -577,7 +578,7 @@ class HomeActivity : PassphraseRequiredActionBarActivity(), private fun markAllAsRead(thread: ThreadRecord) { ThreadUtils.queue { - MessagingModuleConfiguration.shared.storage.markConversationAsRead(thread.threadId, System.currentTimeMillis()) + MessagingModuleConfiguration.shared.storage.markConversationAsRead(thread.threadId, SnodeAPI.nowWithOffset) } } diff --git a/app/src/main/java/org/thoughtcrime/securesms/util/ConfigurationMessageUtilities.kt b/app/src/main/java/org/thoughtcrime/securesms/util/ConfigurationMessageUtilities.kt index 32d2d026ba..4288e4d962 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/util/ConfigurationMessageUtilities.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/util/ConfigurationMessageUtilities.kt @@ -65,7 +65,8 @@ object ConfigurationMessageUtilities { // schedule job if none exist // don't schedule job if we already have one val ourDestination = Destination.Contact(userPublicKey) - if (storage.getConfigSyncJob(ourDestination) != null) return Promise.ofFail(NullPointerException("A job is already pending or in progress, don't schedule another job")) + val currentStorageJob = storage.getConfigSyncJob(ourDestination) + if (currentStorageJob != null && !(currentStorageJob as ConfigurationSyncJob).isRunning.get()) return Promise.ofFail(NullPointerException("A job is already pending or in progress, don't schedule another job")) val newConfigSync = ConfigurationSyncJob(ourDestination) Log.d("Loki", "Scheduling new ConfigurationSyncJob") JobQueue.shared.add(newConfigSync) diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/ConfigurationSyncJob.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/ConfigurationSyncJob.kt index 7f4f64944b..bc09376d6e 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/ConfigurationSyncJob.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/ConfigurationSyncJob.kt @@ -11,6 +11,7 @@ import org.session.libsession.messaging.utilities.Data import org.session.libsession.snode.RawResponse import org.session.libsession.snode.SnodeAPI import org.session.libsignal.utilities.Log +import java.util.concurrent.atomic.AtomicBoolean // only contact (self) and closed group destinations will be supported data class ConfigurationSyncJob(val destination: Destination): Job { @@ -20,7 +21,16 @@ data class ConfigurationSyncJob(val destination: Destination): Job { override var failureCount: Int = 0 override val maxFailureCount: Int = 1 - override suspend fun execute(dispatcherName: String) { + val isRunning = AtomicBoolean(false) + + suspend fun wrap(body: suspend ()->Unit) { + isRunning.set(true) + body() + isRunning.set(false) + } + + + override suspend fun execute(dispatcherName: String) = wrap { val userEdKeyPair = MessagingModuleConfiguration.shared.getUserED25519KeyPair() val userPublicKey = MessagingModuleConfiguration.shared.storage.getUserPublicKey() val delegate = delegate @@ -37,8 +47,7 @@ data class ConfigurationSyncJob(val destination: Destination): Job { || (destination is Destination.Contact && destination.publicKey != userPublicKey) ) { Log.w(TAG, "No need to run config sync job, TODO") - delegate?.handleJobSucceeded(this, dispatcherName) - return + return@wrap delegate?.handleJobSucceeded(this, dispatcherName) ?: Unit } // configFactory singleton instance will come in handy for modifying hashes and fetching configs for namespace etc @@ -52,7 +61,7 @@ data class ConfigurationSyncJob(val destination: Destination): Job { ).filter { config -> config.needsPush() } // don't run anything if we don't need to push anything - if (configsRequiringPush.isEmpty()) return delegate.handleJobSucceeded(this, dispatcherName) + if (configsRequiringPush.isEmpty()) return@wrap delegate.handleJobSucceeded(this, dispatcherName) // allow null results here so the list index matches configsRequiringPush val batchObjects: List?> = configsRequiringPush.map { config -> @@ -79,7 +88,7 @@ data class ConfigurationSyncJob(val destination: Destination): Job { if (batchObjects.any { it == null }) { // stop running here, something like a signing error occurred - return delegate.handleJobFailedPermanently(this, dispatcherName, NullPointerException("One or more requests had a null batch request info")) + return@wrap delegate.handleJobFailedPermanently(this, dispatcherName, NullPointerException("One or more requests had a null batch request info")) } val allRequests = mutableListOf() @@ -145,7 +154,7 @@ data class ConfigurationSyncJob(val destination: Destination): Job { } } catch (e: Exception) { Log.e(TAG, "Error performing batch request", e) - return delegate.handleJobFailedPermanently(this, dispatcherName, e) + return@wrap delegate.handleJobFailedPermanently(this, dispatcherName, e) } delegate.handleJobSucceeded(this, dispatcherName) } diff --git a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/Poller.kt b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/Poller.kt index 6166e32da3..bde22a5dac 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/Poller.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/Poller.kt @@ -194,24 +194,40 @@ class Poller(private val configFactory: ConfigFactoryProtocol, debounceTimer: Ti if (deferred.promise.isDone()) { return@bind Promise.ofSuccess(Unit) } else { - val requestList = (rawResponses["results"] as List) + val responseList = (rawResponses["results"] as List) + // the first response will be the personal messages + val personalResponseIndex = requestSparseArray.indexOfKey(Namespace.DEFAULT) + if (personalResponseIndex >= 0) { + responseList.getOrNull(personalResponseIndex)?.let { rawResponse -> + if (rawResponse["code"] as? Int != 200) { + Log.e("Loki-DBG", "Batch sub-request for personal messages had non-200 response code, returned code ${(rawResponse["code"] as? Int) ?: "[unknown]"}") + } else { + val body = rawResponse["body"] as? RawResponse + if (body == null) { + Log.e("Loki-DBG", "Batch sub-request for personal messages didn't contain a body") + } else { + processPersonalMessages(snode, body) + } + } + } + } // in case we had null configs, the array won't be fully populated // index of the sparse array key iterator should be the request index, with the key being the namespace - requestSparseArray.keyIterator().withIndex().forEach { (requestIndex, key) -> - requestList.getOrNull(requestIndex)?.let { rawResponse -> - if (rawResponse["code"] as? Int != 200) { - Log.e("Loki-DBG", "Batch sub-request had non-200 response code, returned code ${(rawResponse["code"] as? Int) ?: "[unknown]"}") - return@forEach - } - val body = rawResponse["body"] as? RawResponse - if (body == null) { - Log.e("Loki-DBG", "Batch sub-request didn't contain a body") - return@forEach - } - if (key == Namespace.DEFAULT) { - processPersonalMessages(snode, body) - } else { - configDebouncer.publish { + configDebouncer.publish { + requestSparseArray.keyIterator().withIndex().forEach { (requestIndex, key) -> + responseList.getOrNull(requestIndex)?.let { rawResponse -> + if (rawResponse["code"] as? Int != 200) { + Log.e("Loki-DBG", "Batch sub-request had non-200 response code, returned code ${(rawResponse["code"] as? Int) ?: "[unknown]"}") + return@forEach + } + val body = rawResponse["body"] as? RawResponse + if (body == null) { + Log.e("Loki-DBG", "Batch sub-request didn't contain a body") + return@forEach + } + if (key == Namespace.DEFAULT) { + return@forEach // continue, skip default namespace + } else { when (ConfigBase.kindFor(key)) { UserProfile::class.java -> processConfig(snode, body, key, configFactory.user) Contacts::class.java -> processConfig(snode, body, key, configFactory.contacts)