From b0edfef6a9df14d9465b862918edd13723401650 Mon Sep 17 00:00:00 2001 From: Ryan ZHAO <> Date: Wed, 29 May 2024 15:34:45 +1000 Subject: [PATCH 1/4] fix an issue when onboarding gets stuck in 421 loop when trying to fetch UserProfile config --- .../libsession/messaging/sending_receiving/pollers/Poller.kt | 4 ++++ .../src/main/java/org/session/libsession/snode/SnodeAPI.kt | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/Poller.kt b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/Poller.kt index 39ed79de1e..6b9186708a 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/Poller.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/Poller.kt @@ -251,6 +251,10 @@ class Poller(private val configFactory: ConfigFactoryProtocol, debounceTimer: Ti responseList.getOrNull(personalResponseIndex)?.let { rawResponse -> if (rawResponse["code"] as? Int != 200) { Log.e("Loki", "Batch sub-request for personal messages had non-200 response code, returned code ${(rawResponse["code"] as? Int) ?: "[unknown]"}") + // If we got a non-success response then the snode might be bad so we should try rotate + // to a different one just in case + pollNextSnode(deferred) + return@bind Promise.ofSuccess(Unit) } else { val body = rawResponse["body"] as? RawResponse if (body == null) { diff --git a/libsession/src/main/java/org/session/libsession/snode/SnodeAPI.kt b/libsession/src/main/java/org/session/libsession/snode/SnodeAPI.kt index 0f996bacac..e774d87a55 100644 --- a/libsession/src/main/java/org/session/libsession/snode/SnodeAPI.kt +++ b/libsession/src/main/java/org/session/libsession/snode/SnodeAPI.kt @@ -520,7 +520,7 @@ object SnodeAPI { Log.w("Loki", "response code was not 200") handleSnodeError( response["code"] as? Int ?: 0, - response, + response["body"] as? Map<*, *>, snode, publicKey ) From a37039cebf36feeb338cec3a311ed594502ac061 Mon Sep 17 00:00:00 2001 From: Ryan ZHAO <> Date: Thu, 30 May 2024 16:08:05 +1000 Subject: [PATCH 2/4] retrieve user profile only during onboarding --- .../securesms/ApplicationContext.java | 7 ++ .../securesms/onboarding/LoadingViewModel.kt | 2 +- .../sending_receiving/pollers/Poller.kt | 81 +++++++++++++++++-- 3 files changed, 84 insertions(+), 6 deletions(-) diff --git a/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java b/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java index 3708915700..c2f6877713 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java +++ b/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java @@ -454,6 +454,13 @@ public class ApplicationContext extends Application implements DefaultLifecycleO ClosedGroupPollerV2.getShared().start(); } + public void retrieveUserProfile() { + setUpPollingIfNeeded(); + if (poller != null) { + poller.retrieveUserProfile(); + } + } + private void resubmitProfilePictureIfNeeded() { // Files expire on the file server after a while, so we simply re-upload the user's profile picture // at a certain interval to ensure it's always available. diff --git a/app/src/main/java/org/thoughtcrime/securesms/onboarding/LoadingViewModel.kt b/app/src/main/java/org/thoughtcrime/securesms/onboarding/LoadingViewModel.kt index bf50baa736..0d012e07f3 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/onboarding/LoadingViewModel.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/onboarding/LoadingViewModel.kt @@ -82,7 +82,7 @@ class LoadingViewModel @Inject constructor( } // start polling and wait for updated message - ApplicationContext.getInstance(context).apply { startPollingIfNeeded() } + ApplicationContext.getInstance(context).apply { retrieveUserProfile() } TextSecurePreferences.events.filter { it == TextSecurePreferences.CONFIGURATION_SYNCED }.collect { // handle we've synced skipJob.cancel() diff --git a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/Poller.kt b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/Poller.kt index 6b9186708a..ed57ec6120 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/Poller.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/Poller.kt @@ -61,6 +61,16 @@ class Poller(private val configFactory: ConfigFactoryProtocol, debounceTimer: Ti hasStarted = false usedSnodes.clear() } + + fun retrieveUserProfile() { + Log.d("Loki", "Retrieving user profile.") + SnodeAPI.getSwarm(userPublicKey).bind { + usedSnodes.clear() + val deferred = deferred() + pollNextSnode(userProfileOnly = true, deferred) + deferred.promise + } + } // endregion // region Private API @@ -70,7 +80,7 @@ class Poller(private val configFactory: ConfigFactoryProtocol, debounceTimer: Ti SnodeAPI.getSwarm(userPublicKey).bind { usedSnodes.clear() val deferred = deferred() - pollNextSnode(deferred) + pollNextSnode(deferred = deferred) deferred.promise }.success { val nextDelay = if (isCaughtUp) retryInterval else 0 @@ -89,7 +99,7 @@ class Poller(private val configFactory: ConfigFactoryProtocol, debounceTimer: Ti } } - private fun pollNextSnode(deferred: Deferred) { + private fun pollNextSnode(userProfileOnly: Boolean = false, deferred: Deferred) { val swarm = SnodeModule.shared.storage.getSwarm(userPublicKey) ?: setOf() val unusedSnodes = swarm.subtract(usedSnodes) if (unusedSnodes.isNotEmpty()) { @@ -97,13 +107,13 @@ class Poller(private val configFactory: ConfigFactoryProtocol, debounceTimer: Ti val nextSnode = unusedSnodes.elementAt(index) usedSnodes.add(nextSnode) Log.d("Loki", "Polling $nextSnode.") - poll(nextSnode, deferred).fail { exception -> + poll(userProfileOnly, nextSnode, deferred).fail { exception -> if (exception is PromiseCanceledException) { Log.d("Loki", "Polling $nextSnode canceled.") } else { Log.d("Loki", "Polling $nextSnode failed; dropping it and switching to next snode.") SnodeAPI.dropSnodeFromSwarmIfNeeded(nextSnode, userPublicKey) - pollNextSnode(deferred) + pollNextSnode(userProfileOnly, deferred) } } } else { @@ -168,6 +178,67 @@ class Poller(private val configFactory: ConfigFactoryProtocol, debounceTimer: Ti } } + private fun poll(userProfileOnly: Boolean, snode: Snode, deferred: Deferred): Promise { + if (userProfileOnly) { + return pollUserProfile(snode, deferred) + } + return poll(snode, deferred) + } + + private fun pollUserProfile(snode: Snode, deferred: Deferred): Promise { + return task { + runBlocking(Dispatchers.IO) { + val requests = mutableListOf() + val hashesToExtend = mutableSetOf() + configFactory.user?.let { config -> + hashesToExtend += config.currentHashes() + SnodeAPI.buildAuthenticatedRetrieveBatchRequest( + snode, userPublicKey, + config.configNamespace(), + maxSize = -8 + ) + }?.let { request -> + requests += request + } + + if (hashesToExtend.isNotEmpty()) { + SnodeAPI.buildAuthenticatedAlterTtlBatchRequest( + messageHashes = hashesToExtend.toList(), + publicKey = userPublicKey, + newExpiry = SnodeAPI.nowWithOffset + 14.days.inWholeMilliseconds, + extend = true + )?.let { extensionRequest -> + requests += extensionRequest + } + } + + SnodeAPI.getRawBatchResponse(snode, userPublicKey, requests).bind { rawResponses -> + isCaughtUp = true + if (deferred.promise.isDone()) { + return@bind Promise.ofSuccess(Unit) + } else { + val responseList = (rawResponses["results"] as List) + responseList.getOrNull(0)?.let { rawResponse -> + if (rawResponse["code"] as? Int != 200) { + Log.e("Loki", "Batch sub-request had non-200 response code, returned code ${(rawResponse["code"] as? Int) ?: "[unknown]"}") + return@bind Promise.ofSuccess(Unit) + } + val body = rawResponse["body"] as? RawResponse + if (body == null) { + Log.e("Loki", "Batch sub-request didn't contain a body") + return@bind Promise.ofSuccess(Unit) + } + processConfig(snode, body, configFactory.user!!.configNamespace(), configFactory.user) + } + return@bind Promise.ofSuccess(Unit) + } + }.fail { + Log.e("Loki", "Failed to get raw batch response", it) + } + } + } + } + private fun poll(snode: Snode, deferred: Deferred): Promise { if (!hasStarted) { return Promise.ofFail(PromiseCanceledException()) } return task { @@ -253,7 +324,7 @@ class Poller(private val configFactory: ConfigFactoryProtocol, debounceTimer: Ti Log.e("Loki", "Batch sub-request for personal messages had non-200 response code, returned code ${(rawResponse["code"] as? Int) ?: "[unknown]"}") // If we got a non-success response then the snode might be bad so we should try rotate // to a different one just in case - pollNextSnode(deferred) + pollNextSnode(deferred = deferred) return@bind Promise.ofSuccess(Unit) } else { val body = rawResponse["body"] as? RawResponse From 9e025f1b9d0a5b14f35ae966c76d48c1ea7e842e Mon Sep 17 00:00:00 2001 From: Ryan ZHAO <> Date: Mon, 3 Jun 2024 10:19:06 +1000 Subject: [PATCH 3/4] clean --- .../sending_receiving/pollers/Poller.kt | 166 +++++++++--------- 1 file changed, 84 insertions(+), 82 deletions(-) diff --git a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/Poller.kt b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/Poller.kt index ed57ec6120..4b41fd374f 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/Poller.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/Poller.kt @@ -66,9 +66,9 @@ class Poller(private val configFactory: ConfigFactoryProtocol, debounceTimer: Ti Log.d("Loki", "Retrieving user profile.") SnodeAPI.getSwarm(userPublicKey).bind { usedSnodes.clear() - val deferred = deferred() - pollNextSnode(userProfileOnly = true, deferred) - deferred.promise + deferred().also { + pollNextSnode(userProfileOnly = true, it) + }.promise } } // endregion @@ -185,33 +185,33 @@ class Poller(private val configFactory: ConfigFactoryProtocol, debounceTimer: Ti return poll(snode, deferred) } - private fun pollUserProfile(snode: Snode, deferred: Deferred): Promise { - return task { - runBlocking(Dispatchers.IO) { - val requests = mutableListOf() - val hashesToExtend = mutableSetOf() - configFactory.user?.let { config -> - hashesToExtend += config.currentHashes() - SnodeAPI.buildAuthenticatedRetrieveBatchRequest( - snode, userPublicKey, - config.configNamespace(), - maxSize = -8 - ) - }?.let { request -> - requests += request - } + private fun pollUserProfile(snode: Snode, deferred: Deferred): Promise = task { + runBlocking(Dispatchers.IO) { + val requests = mutableListOf() + val hashesToExtend = mutableSetOf() + configFactory.user?.let { config -> + hashesToExtend += config.currentHashes() + SnodeAPI.buildAuthenticatedRetrieveBatchRequest( + snode, userPublicKey, + config.configNamespace(), + maxSize = -8 + ) + }?.let { request -> + requests += request + } - if (hashesToExtend.isNotEmpty()) { - SnodeAPI.buildAuthenticatedAlterTtlBatchRequest( - messageHashes = hashesToExtend.toList(), - publicKey = userPublicKey, - newExpiry = SnodeAPI.nowWithOffset + 14.days.inWholeMilliseconds, - extend = true - )?.let { extensionRequest -> - requests += extensionRequest - } + if (hashesToExtend.isNotEmpty()) { + SnodeAPI.buildAuthenticatedAlterTtlBatchRequest( + messageHashes = hashesToExtend.toList(), + publicKey = userPublicKey, + newExpiry = SnodeAPI.nowWithOffset + 14.days.inWholeMilliseconds, + extend = true + )?.let { extensionRequest -> + requests += extensionRequest } + } + if (requests.isNotEmpty()) { SnodeAPI.getRawBatchResponse(snode, userPublicKey, requests).bind { rawResponses -> isCaughtUp = true if (deferred.promise.isDone()) { @@ -277,71 +277,73 @@ class Poller(private val configFactory: ConfigFactoryProtocol, debounceTimer: Ti } } - SnodeAPI.getRawBatchResponse(snode, userPublicKey, requests).bind { rawResponses -> - isCaughtUp = true - if (deferred.promise.isDone()) { - return@bind Promise.ofSuccess(Unit) - } else { - val responseList = (rawResponses["results"] as List) - // in case we had null configs, the array won't be fully populated - // index of the sparse array key iterator should be the request index, with the key being the namespace - listOfNotNull( - configFactory.user?.configNamespace(), - configFactory.contacts?.configNamespace(), - configFactory.userGroups?.configNamespace(), - configFactory.convoVolatile?.configNamespace() - ).map { - it to requestSparseArray.indexOfKey(it) - }.filter { (_, i) -> i >= 0 }.forEach { (key, requestIndex) -> - responseList.getOrNull(requestIndex)?.let { rawResponse -> - if (rawResponse["code"] as? Int != 200) { - Log.e("Loki", "Batch sub-request had non-200 response code, returned code ${(rawResponse["code"] as? Int) ?: "[unknown]"}") - return@forEach - } - val body = rawResponse["body"] as? RawResponse - if (body == null) { - Log.e("Loki", "Batch sub-request didn't contain a body") - return@forEach - } - if (key == Namespace.DEFAULT) { - return@forEach // continue, skip default namespace - } else { - when (ConfigBase.kindFor(key)) { - UserProfile::class.java -> processConfig(snode, body, key, configFactory.user) - Contacts::class.java -> processConfig(snode, body, key, configFactory.contacts) - ConversationVolatileConfig::class.java -> processConfig(snode, body, key, configFactory.convoVolatile) - UserGroupsConfig::class.java -> processConfig(snode, body, key, configFactory.userGroups) + if (requests.isNotEmpty()) { + SnodeAPI.getRawBatchResponse(snode, userPublicKey, requests).bind { rawResponses -> + isCaughtUp = true + if (deferred.promise.isDone()) { + return@bind Promise.ofSuccess(Unit) + } else { + val responseList = (rawResponses["results"] as List) + // in case we had null configs, the array won't be fully populated + // index of the sparse array key iterator should be the request index, with the key being the namespace + listOfNotNull( + configFactory.user?.configNamespace(), + configFactory.contacts?.configNamespace(), + configFactory.userGroups?.configNamespace(), + configFactory.convoVolatile?.configNamespace() + ).map { + it to requestSparseArray.indexOfKey(it) + }.filter { (_, i) -> i >= 0 }.forEach { (key, requestIndex) -> + responseList.getOrNull(requestIndex)?.let { rawResponse -> + if (rawResponse["code"] as? Int != 200) { + Log.e("Loki", "Batch sub-request had non-200 response code, returned code ${(rawResponse["code"] as? Int) ?: "[unknown]"}") + return@forEach } - } - } - } - - // the first response will be the personal messages (we want these to be processed after config messages) - val personalResponseIndex = requestSparseArray.indexOfKey(Namespace.DEFAULT) - if (personalResponseIndex >= 0) { - responseList.getOrNull(personalResponseIndex)?.let { rawResponse -> - if (rawResponse["code"] as? Int != 200) { - Log.e("Loki", "Batch sub-request for personal messages had non-200 response code, returned code ${(rawResponse["code"] as? Int) ?: "[unknown]"}") - // If we got a non-success response then the snode might be bad so we should try rotate - // to a different one just in case - pollNextSnode(deferred = deferred) - return@bind Promise.ofSuccess(Unit) - } else { val body = rawResponse["body"] as? RawResponse if (body == null) { - Log.e("Loki", "Batch sub-request for personal messages didn't contain a body") + Log.e("Loki", "Batch sub-request didn't contain a body") + return@forEach + } + if (key == Namespace.DEFAULT) { + return@forEach // continue, skip default namespace } else { - processPersonalMessages(snode, body) + when (ConfigBase.kindFor(key)) { + UserProfile::class.java -> processConfig(snode, body, key, configFactory.user) + Contacts::class.java -> processConfig(snode, body, key, configFactory.contacts) + ConversationVolatileConfig::class.java -> processConfig(snode, body, key, configFactory.convoVolatile) + UserGroupsConfig::class.java -> processConfig(snode, body, key, configFactory.userGroups) + } } } } - } + // the first response will be the personal messages (we want these to be processed after config messages) + val personalResponseIndex = requestSparseArray.indexOfKey(Namespace.DEFAULT) + if (personalResponseIndex >= 0) { + responseList.getOrNull(personalResponseIndex)?.let { rawResponse -> + if (rawResponse["code"] as? Int != 200) { + Log.e("Loki", "Batch sub-request for personal messages had non-200 response code, returned code ${(rawResponse["code"] as? Int) ?: "[unknown]"}") + // If we got a non-success response then the snode might be bad so we should try rotate + // to a different one just in case + pollNextSnode(deferred = deferred) + return@bind Promise.ofSuccess(Unit) + } else { + val body = rawResponse["body"] as? RawResponse + if (body == null) { + Log.e("Loki", "Batch sub-request for personal messages didn't contain a body") + } else { + processPersonalMessages(snode, body) + } + } + } + } + + poll(snode, deferred) + } + }.fail { + Log.e("Loki", "Failed to get raw batch response", it) poll(snode, deferred) } - }.fail { - Log.e("Loki", "Failed to get raw batch response", it) - poll(snode, deferred) } } } From dc347d937c1bd08b2123bc0111e539fdf411ef2f Mon Sep 17 00:00:00 2001 From: Ryan ZHAO <> Date: Mon, 3 Jun 2024 11:16:29 +1000 Subject: [PATCH 4/4] clean --- .../sending_receiving/pollers/Poller.kt | 20 +++++++++---------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/Poller.kt b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/Poller.kt index 4b41fd374f..de9648f5e6 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/Poller.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/Poller.kt @@ -214,24 +214,22 @@ class Poller(private val configFactory: ConfigFactoryProtocol, debounceTimer: Ti if (requests.isNotEmpty()) { SnodeAPI.getRawBatchResponse(snode, userPublicKey, requests).bind { rawResponses -> isCaughtUp = true - if (deferred.promise.isDone()) { - return@bind Promise.ofSuccess(Unit) - } else { + if (!deferred.promise.isDone()) { val responseList = (rawResponses["results"] as List) responseList.getOrNull(0)?.let { rawResponse -> if (rawResponse["code"] as? Int != 200) { Log.e("Loki", "Batch sub-request had non-200 response code, returned code ${(rawResponse["code"] as? Int) ?: "[unknown]"}") - return@bind Promise.ofSuccess(Unit) + } else { + val body = rawResponse["body"] as? RawResponse + if (body == null) { + Log.e("Loki", "Batch sub-request didn't contain a body") + } else { + processConfig(snode, body, configFactory.user!!.configNamespace(), configFactory.user) + } } - val body = rawResponse["body"] as? RawResponse - if (body == null) { - Log.e("Loki", "Batch sub-request didn't contain a body") - return@bind Promise.ofSuccess(Unit) - } - processConfig(snode, body, configFactory.user!!.configNamespace(), configFactory.user) } - return@bind Promise.ofSuccess(Unit) } + Promise.ofSuccess(Unit) }.fail { Log.e("Loki", "Failed to get raw batch response", it) }