Merge remote-tracking branch 'ry/poller-fix' into on-3

This commit is contained in:
bemusementpark 2024-07-05 16:55:18 +09:30
commit fa5b10e2e1
4 changed files with 139 additions and 57 deletions

View File

@ -453,6 +453,13 @@ public class ApplicationContext extends Application implements DefaultLifecycleO
ClosedGroupPollerV2.getShared().start(); ClosedGroupPollerV2.getShared().start();
} }
public void retrieveUserProfile() {
setUpPollingIfNeeded();
if (poller != null) {
poller.retrieveUserProfile();
}
}
private void resubmitProfilePictureIfNeeded() { private void resubmitProfilePictureIfNeeded() {
// Files expire on the file server after a while, so we simply re-upload the user's profile picture // Files expire on the file server after a while, so we simply re-upload the user's profile picture
// at a certain interval to ensure it's always available. // at a certain interval to ensure it's always available.

View File

@ -52,7 +52,7 @@ class LoadAccountManager @Inject constructor(
setHasViewedSeed(true) setHasViewedSeed(true)
} }
ApplicationContext.getInstance(context).apply { startPollingIfNeeded() } ApplicationContext.getInstance(context).retrieveUserProfile()
} }
} }
} }

View File

@ -62,6 +62,16 @@ class Poller(private val configFactory: ConfigFactoryProtocol, debounceTimer: Ti
hasStarted = false hasStarted = false
usedSnodes.clear() usedSnodes.clear()
} }
fun retrieveUserProfile() {
Log.d("Loki", "Retrieving user profile.")
SnodeAPI.getSwarm(userPublicKey).bind {
usedSnodes.clear()
deferred<Unit, Exception>().also {
pollNextSnode(userProfileOnly = true, it)
}.promise
}
}
// endregion // endregion
// region Private API // region Private API
@ -71,7 +81,7 @@ class Poller(private val configFactory: ConfigFactoryProtocol, debounceTimer: Ti
SnodeAPI.getSwarm(userPublicKey).bind { SnodeAPI.getSwarm(userPublicKey).bind {
usedSnodes.clear() usedSnodes.clear()
val deferred = deferred<Unit, Exception>() val deferred = deferred<Unit, Exception>()
pollNextSnode(deferred) pollNextSnode(deferred = deferred)
deferred.promise deferred.promise
}.success { }.success {
val nextDelay = if (isCaughtUp) retryInterval else 0 val nextDelay = if (isCaughtUp) retryInterval else 0
@ -90,7 +100,7 @@ class Poller(private val configFactory: ConfigFactoryProtocol, debounceTimer: Ti
} }
} }
private fun pollNextSnode(deferred: Deferred<Unit, Exception>) { private fun pollNextSnode(userProfileOnly: Boolean = false, deferred: Deferred<Unit, Exception>) {
val swarm = SnodeModule.shared.storage.getSwarm(userPublicKey) ?: setOf() val swarm = SnodeModule.shared.storage.getSwarm(userPublicKey) ?: setOf()
val unusedSnodes = swarm.subtract(usedSnodes) val unusedSnodes = swarm.subtract(usedSnodes)
if (unusedSnodes.isNotEmpty()) { if (unusedSnodes.isNotEmpty()) {
@ -98,13 +108,13 @@ class Poller(private val configFactory: ConfigFactoryProtocol, debounceTimer: Ti
val nextSnode = unusedSnodes.elementAt(index) val nextSnode = unusedSnodes.elementAt(index)
usedSnodes.add(nextSnode) usedSnodes.add(nextSnode)
Log.d("Loki", "Polling $nextSnode.") Log.d("Loki", "Polling $nextSnode.")
poll(nextSnode, deferred).fail { exception -> poll(userProfileOnly, nextSnode, deferred).fail { exception ->
if (exception is PromiseCanceledException) { if (exception is PromiseCanceledException) {
Log.d("Loki", "Polling $nextSnode canceled.") Log.d("Loki", "Polling $nextSnode canceled.")
} else { } else {
Log.d("Loki", "Polling $nextSnode failed; dropping it and switching to next snode.") Log.d("Loki", "Polling $nextSnode failed; dropping it and switching to next snode.")
SnodeAPI.dropSnodeFromSwarmIfNeeded(nextSnode, userPublicKey) SnodeAPI.dropSnodeFromSwarmIfNeeded(nextSnode, userPublicKey)
pollNextSnode(deferred) pollNextSnode(userProfileOnly, deferred)
} }
} }
} else { } else {
@ -158,6 +168,65 @@ class Poller(private val configFactory: ConfigFactoryProtocol, debounceTimer: Ti
} }
} }
private fun poll(userProfileOnly: Boolean, snode: Snode, deferred: Deferred<Unit, Exception>): Promise<Unit, Exception> {
if (userProfileOnly) {
return pollUserProfile(snode, deferred)
}
return poll(snode, deferred)
}
private fun pollUserProfile(snode: Snode, deferred: Deferred<Unit, Exception>): Promise<Unit, Exception> = task {
runBlocking(Dispatchers.IO) {
val requests = mutableListOf<SnodeAPI.SnodeBatchRequestInfo>()
val hashesToExtend = mutableSetOf<String>()
configFactory.user?.let { config ->
hashesToExtend += config.currentHashes()
SnodeAPI.buildAuthenticatedRetrieveBatchRequest(
snode, userPublicKey,
config.configNamespace(),
maxSize = -8
)
}?.let { request ->
requests += request
}
if (hashesToExtend.isNotEmpty()) {
SnodeAPI.buildAuthenticatedAlterTtlBatchRequest(
messageHashes = hashesToExtend.toList(),
publicKey = userPublicKey,
newExpiry = SnodeAPI.nowWithOffset + 14.days.inWholeMilliseconds,
extend = true
)?.let { extensionRequest ->
requests += extensionRequest
}
}
if (requests.isNotEmpty()) {
SnodeAPI.getRawBatchResponse(snode, userPublicKey, requests).bind { rawResponses ->
isCaughtUp = true
if (!deferred.promise.isDone()) {
val responseList = (rawResponses["results"] as List<RawResponse>)
responseList.getOrNull(0)?.let { rawResponse ->
if (rawResponse["code"] as? Int != 200) {
Log.e("Loki", "Batch sub-request had non-200 response code, returned code ${(rawResponse["code"] as? Int) ?: "[unknown]"}")
} else {
val body = rawResponse["body"] as? RawResponse
if (body == null) {
Log.e("Loki", "Batch sub-request didn't contain a body")
} else {
processConfig(snode, body, configFactory.user!!.configNamespace(), configFactory.user)
}
}
}
}
Promise.ofSuccess(Unit)
}.fail {
Log.e("Loki", "Failed to get raw batch response", it)
}
}
}
}
private fun poll(snode: Snode, deferred: Deferred<Unit, Exception>): Promise<Unit, Exception> { private fun poll(snode: Snode, deferred: Deferred<Unit, Exception>): Promise<Unit, Exception> {
if (!hasStarted) { return Promise.ofFail(PromiseCanceledException()) } if (!hasStarted) { return Promise.ofFail(PromiseCanceledException()) }
return task { return task {
@ -196,6 +265,7 @@ class Poller(private val configFactory: ConfigFactoryProtocol, debounceTimer: Ti
} }
} }
if (requests.isNotEmpty()) {
SnodeAPI.getRawBatchResponse(snode, userPublicKey, requests).bind { rawResponses -> SnodeAPI.getRawBatchResponse(snode, userPublicKey, requests).bind { rawResponses ->
isCaughtUp = true isCaughtUp = true
if (deferred.promise.isDone()) { if (deferred.promise.isDone()) {
@ -241,6 +311,10 @@ class Poller(private val configFactory: ConfigFactoryProtocol, debounceTimer: Ti
responseList.getOrNull(personalResponseIndex)?.let { rawResponse -> responseList.getOrNull(personalResponseIndex)?.let { rawResponse ->
if (rawResponse["code"] as? Int != 200) { if (rawResponse["code"] as? Int != 200) {
Log.e("Loki", "Batch sub-request for personal messages had non-200 response code, returned code ${(rawResponse["code"] as? Int) ?: "[unknown]"}") Log.e("Loki", "Batch sub-request for personal messages had non-200 response code, returned code ${(rawResponse["code"] as? Int) ?: "[unknown]"}")
// If we got a non-success response then the snode might be bad so we should try rotate
// to a different one just in case
pollNextSnode(deferred = deferred)
return@bind Promise.ofSuccess(Unit)
} else { } else {
val body = rawResponse["body"] as? RawResponse val body = rawResponse["body"] as? RawResponse
if (body == null) { if (body == null) {
@ -261,5 +335,6 @@ class Poller(private val configFactory: ConfigFactoryProtocol, debounceTimer: Ti
} }
} }
} }
}
// endregion // endregion
} }

View File

@ -520,7 +520,7 @@ object SnodeAPI {
Log.w("Loki", "response code was not 200") Log.w("Loki", "response code was not 200")
handleSnodeError( handleSnodeError(
response["code"] as? Int ?: 0, response["code"] as? Int ?: 0,
response, response["body"] as? Map<*, *>,
snode, snode,
publicKey publicKey
) )