feat: batching poll responses properly and handling groups properly

This commit is contained in:
0x330a 2023-02-21 13:26:19 +11:00
parent 8c512c7b6e
commit 621e564023
No known key found for this signature in database
GPG Key ID: 267811D6E6A2698C
5 changed files with 54 additions and 27 deletions

View File

@ -133,7 +133,7 @@ class Storage(context: Context, helper: SQLCipherOpenHelper, private val configF
configFactory.convoVolatile?.let { config -> configFactory.convoVolatile?.let { config ->
val convo = when { val convo = when {
// recipient closed group // recipient closed group
recipient.isClosedGroupRecipient -> config.getOrConstructLegacyClosedGroup(recipient.address.serialize()) recipient.isClosedGroupRecipient -> config.getOrConstructLegacyClosedGroup(GroupUtil.doubleDecodeGroupId(recipient.address.serialize()))
// recipient is open group // recipient is open group
recipient.isOpenGroupRecipient -> { recipient.isOpenGroupRecipient -> {
val openGroupJoinUrl = getOpenGroup(threadId)?.joinURL ?: return val openGroupJoinUrl = getOpenGroup(threadId)?.joinURL ?: return

View File

@ -1,11 +1,11 @@
package org.thoughtcrime.securesms.home package org.thoughtcrime.securesms.home
import android.content.BroadcastReceiver import android.content.BroadcastReceiver
import android.content.ClipData
import android.content.ClipboardManager
import android.content.Context import android.content.Context
import android.content.Intent import android.content.Intent
import android.content.IntentFilter import android.content.IntentFilter
import android.content.ClipData
import android.content.ClipboardManager
import android.os.Bundle import android.os.Bundle
import android.text.SpannableString import android.text.SpannableString
import android.widget.Toast import android.widget.Toast
@ -33,6 +33,7 @@ import org.greenrobot.eventbus.ThreadMode
import org.session.libsession.messaging.MessagingModuleConfiguration import org.session.libsession.messaging.MessagingModuleConfiguration
import org.session.libsession.messaging.jobs.JobQueue import org.session.libsession.messaging.jobs.JobQueue
import org.session.libsession.messaging.sending_receiving.MessageSender import org.session.libsession.messaging.sending_receiving.MessageSender
import org.session.libsession.snode.SnodeAPI
import org.session.libsession.utilities.Address import org.session.libsession.utilities.Address
import org.session.libsession.utilities.GroupUtil import org.session.libsession.utilities.GroupUtil
import org.session.libsession.utilities.ProfilePictureModifiedEvent import org.session.libsession.utilities.ProfilePictureModifiedEvent
@ -577,7 +578,7 @@ class HomeActivity : PassphraseRequiredActionBarActivity(),
private fun markAllAsRead(thread: ThreadRecord) { private fun markAllAsRead(thread: ThreadRecord) {
ThreadUtils.queue { ThreadUtils.queue {
MessagingModuleConfiguration.shared.storage.markConversationAsRead(thread.threadId, System.currentTimeMillis()) MessagingModuleConfiguration.shared.storage.markConversationAsRead(thread.threadId, SnodeAPI.nowWithOffset)
} }
} }

View File

@ -65,7 +65,8 @@ object ConfigurationMessageUtilities {
// schedule job if none exist // schedule job if none exist
// don't schedule job if we already have one // don't schedule job if we already have one
val ourDestination = Destination.Contact(userPublicKey) val ourDestination = Destination.Contact(userPublicKey)
if (storage.getConfigSyncJob(ourDestination) != null) return Promise.ofFail(NullPointerException("A job is already pending or in progress, don't schedule another job")) val currentStorageJob = storage.getConfigSyncJob(ourDestination)
if (currentStorageJob != null && !(currentStorageJob as ConfigurationSyncJob).isRunning.get()) return Promise.ofFail(NullPointerException("A job is already pending or in progress, don't schedule another job"))
val newConfigSync = ConfigurationSyncJob(ourDestination) val newConfigSync = ConfigurationSyncJob(ourDestination)
Log.d("Loki", "Scheduling new ConfigurationSyncJob") Log.d("Loki", "Scheduling new ConfigurationSyncJob")
JobQueue.shared.add(newConfigSync) JobQueue.shared.add(newConfigSync)

View File

@ -11,6 +11,7 @@ import org.session.libsession.messaging.utilities.Data
import org.session.libsession.snode.RawResponse import org.session.libsession.snode.RawResponse
import org.session.libsession.snode.SnodeAPI import org.session.libsession.snode.SnodeAPI
import org.session.libsignal.utilities.Log import org.session.libsignal.utilities.Log
import java.util.concurrent.atomic.AtomicBoolean
// only contact (self) and closed group destinations will be supported // only contact (self) and closed group destinations will be supported
data class ConfigurationSyncJob(val destination: Destination): Job { data class ConfigurationSyncJob(val destination: Destination): Job {
@ -20,7 +21,16 @@ data class ConfigurationSyncJob(val destination: Destination): Job {
override var failureCount: Int = 0 override var failureCount: Int = 0
override val maxFailureCount: Int = 1 override val maxFailureCount: Int = 1
override suspend fun execute(dispatcherName: String) { val isRunning = AtomicBoolean(false)
suspend fun wrap(body: suspend ()->Unit) {
isRunning.set(true)
body()
isRunning.set(false)
}
override suspend fun execute(dispatcherName: String) = wrap {
val userEdKeyPair = MessagingModuleConfiguration.shared.getUserED25519KeyPair() val userEdKeyPair = MessagingModuleConfiguration.shared.getUserED25519KeyPair()
val userPublicKey = MessagingModuleConfiguration.shared.storage.getUserPublicKey() val userPublicKey = MessagingModuleConfiguration.shared.storage.getUserPublicKey()
val delegate = delegate val delegate = delegate
@ -37,8 +47,7 @@ data class ConfigurationSyncJob(val destination: Destination): Job {
|| (destination is Destination.Contact && destination.publicKey != userPublicKey) || (destination is Destination.Contact && destination.publicKey != userPublicKey)
) { ) {
Log.w(TAG, "No need to run config sync job, TODO") Log.w(TAG, "No need to run config sync job, TODO")
delegate?.handleJobSucceeded(this, dispatcherName) return@wrap delegate?.handleJobSucceeded(this, dispatcherName) ?: Unit
return
} }
// configFactory singleton instance will come in handy for modifying hashes and fetching configs for namespace etc // configFactory singleton instance will come in handy for modifying hashes and fetching configs for namespace etc
@ -52,7 +61,7 @@ data class ConfigurationSyncJob(val destination: Destination): Job {
).filter { config -> config.needsPush() } ).filter { config -> config.needsPush() }
// don't run anything if we don't need to push anything // don't run anything if we don't need to push anything
if (configsRequiringPush.isEmpty()) return delegate.handleJobSucceeded(this, dispatcherName) if (configsRequiringPush.isEmpty()) return@wrap delegate.handleJobSucceeded(this, dispatcherName)
// allow null results here so the list index matches configsRequiringPush // allow null results here so the list index matches configsRequiringPush
val batchObjects: List<Pair<SharedConfigurationMessage, SnodeAPI.SnodeBatchRequestInfo>?> = configsRequiringPush.map { config -> val batchObjects: List<Pair<SharedConfigurationMessage, SnodeAPI.SnodeBatchRequestInfo>?> = configsRequiringPush.map { config ->
@ -79,7 +88,7 @@ data class ConfigurationSyncJob(val destination: Destination): Job {
if (batchObjects.any { it == null }) { if (batchObjects.any { it == null }) {
// stop running here, something like a signing error occurred // stop running here, something like a signing error occurred
return delegate.handleJobFailedPermanently(this, dispatcherName, NullPointerException("One or more requests had a null batch request info")) return@wrap delegate.handleJobFailedPermanently(this, dispatcherName, NullPointerException("One or more requests had a null batch request info"))
} }
val allRequests = mutableListOf<SnodeAPI.SnodeBatchRequestInfo>() val allRequests = mutableListOf<SnodeAPI.SnodeBatchRequestInfo>()
@ -145,7 +154,7 @@ data class ConfigurationSyncJob(val destination: Destination): Job {
} }
} catch (e: Exception) { } catch (e: Exception) {
Log.e(TAG, "Error performing batch request", e) Log.e(TAG, "Error performing batch request", e)
return delegate.handleJobFailedPermanently(this, dispatcherName, e) return@wrap delegate.handleJobFailedPermanently(this, dispatcherName, e)
} }
delegate.handleJobSucceeded(this, dispatcherName) delegate.handleJobSucceeded(this, dispatcherName)
} }

View File

@ -194,11 +194,28 @@ class Poller(private val configFactory: ConfigFactoryProtocol, debounceTimer: Ti
if (deferred.promise.isDone()) { if (deferred.promise.isDone()) {
return@bind Promise.ofSuccess(Unit) return@bind Promise.ofSuccess(Unit)
} else { } else {
val requestList = (rawResponses["results"] as List<RawResponse>) val responseList = (rawResponses["results"] as List<RawResponse>)
// the first response will be the personal messages
val personalResponseIndex = requestSparseArray.indexOfKey(Namespace.DEFAULT)
if (personalResponseIndex >= 0) {
responseList.getOrNull(personalResponseIndex)?.let { rawResponse ->
if (rawResponse["code"] as? Int != 200) {
Log.e("Loki-DBG", "Batch sub-request for personal messages had non-200 response code, returned code ${(rawResponse["code"] as? Int) ?: "[unknown]"}")
} else {
val body = rawResponse["body"] as? RawResponse
if (body == null) {
Log.e("Loki-DBG", "Batch sub-request for personal messages didn't contain a body")
} else {
processPersonalMessages(snode, body)
}
}
}
}
// in case we had null configs, the array won't be fully populated // in case we had null configs, the array won't be fully populated
// index of the sparse array key iterator should be the request index, with the key being the namespace // index of the sparse array key iterator should be the request index, with the key being the namespace
configDebouncer.publish {
requestSparseArray.keyIterator().withIndex().forEach { (requestIndex, key) -> requestSparseArray.keyIterator().withIndex().forEach { (requestIndex, key) ->
requestList.getOrNull(requestIndex)?.let { rawResponse -> responseList.getOrNull(requestIndex)?.let { rawResponse ->
if (rawResponse["code"] as? Int != 200) { if (rawResponse["code"] as? Int != 200) {
Log.e("Loki-DBG", "Batch sub-request had non-200 response code, returned code ${(rawResponse["code"] as? Int) ?: "[unknown]"}") Log.e("Loki-DBG", "Batch sub-request had non-200 response code, returned code ${(rawResponse["code"] as? Int) ?: "[unknown]"}")
return@forEach return@forEach
@ -209,9 +226,8 @@ class Poller(private val configFactory: ConfigFactoryProtocol, debounceTimer: Ti
return@forEach return@forEach
} }
if (key == Namespace.DEFAULT) { if (key == Namespace.DEFAULT) {
processPersonalMessages(snode, body) return@forEach // continue, skip default namespace
} else { } else {
configDebouncer.publish {
when (ConfigBase.kindFor(key)) { when (ConfigBase.kindFor(key)) {
UserProfile::class.java -> processConfig(snode, body, key, configFactory.user) UserProfile::class.java -> processConfig(snode, body, key, configFactory.user)
Contacts::class.java -> processConfig(snode, body, key, configFactory.contacts) Contacts::class.java -> processConfig(snode, body, key, configFactory.contacts)