This commit is contained in:
Ryan ZHAO 2024-06-03 10:19:06 +10:00
parent a37039cebf
commit 9e025f1b9d

View File

@ -66,9 +66,9 @@ class Poller(private val configFactory: ConfigFactoryProtocol, debounceTimer: Ti
Log.d("Loki", "Retrieving user profile.") Log.d("Loki", "Retrieving user profile.")
SnodeAPI.getSwarm(userPublicKey).bind { SnodeAPI.getSwarm(userPublicKey).bind {
usedSnodes.clear() usedSnodes.clear()
val deferred = deferred<Unit, Exception>() deferred<Unit, Exception>().also {
pollNextSnode(userProfileOnly = true, deferred) pollNextSnode(userProfileOnly = true, it)
deferred.promise }.promise
} }
} }
// endregion // endregion
@ -185,33 +185,33 @@ class Poller(private val configFactory: ConfigFactoryProtocol, debounceTimer: Ti
return poll(snode, deferred) return poll(snode, deferred)
} }
private fun pollUserProfile(snode: Snode, deferred: Deferred<Unit, Exception>): Promise<Unit, Exception> { private fun pollUserProfile(snode: Snode, deferred: Deferred<Unit, Exception>): Promise<Unit, Exception> = task {
return task { runBlocking(Dispatchers.IO) {
runBlocking(Dispatchers.IO) { val requests = mutableListOf<SnodeAPI.SnodeBatchRequestInfo>()
val requests = mutableListOf<SnodeAPI.SnodeBatchRequestInfo>() val hashesToExtend = mutableSetOf<String>()
val hashesToExtend = mutableSetOf<String>() configFactory.user?.let { config ->
configFactory.user?.let { config -> hashesToExtend += config.currentHashes()
hashesToExtend += config.currentHashes() SnodeAPI.buildAuthenticatedRetrieveBatchRequest(
SnodeAPI.buildAuthenticatedRetrieveBatchRequest( snode, userPublicKey,
snode, userPublicKey, config.configNamespace(),
config.configNamespace(), maxSize = -8
maxSize = -8 )
) }?.let { request ->
}?.let { request -> requests += request
requests += request }
}
if (hashesToExtend.isNotEmpty()) { if (hashesToExtend.isNotEmpty()) {
SnodeAPI.buildAuthenticatedAlterTtlBatchRequest( SnodeAPI.buildAuthenticatedAlterTtlBatchRequest(
messageHashes = hashesToExtend.toList(), messageHashes = hashesToExtend.toList(),
publicKey = userPublicKey, publicKey = userPublicKey,
newExpiry = SnodeAPI.nowWithOffset + 14.days.inWholeMilliseconds, newExpiry = SnodeAPI.nowWithOffset + 14.days.inWholeMilliseconds,
extend = true extend = true
)?.let { extensionRequest -> )?.let { extensionRequest ->
requests += extensionRequest requests += extensionRequest
}
} }
}
if (requests.isNotEmpty()) {
SnodeAPI.getRawBatchResponse(snode, userPublicKey, requests).bind { rawResponses -> SnodeAPI.getRawBatchResponse(snode, userPublicKey, requests).bind { rawResponses ->
isCaughtUp = true isCaughtUp = true
if (deferred.promise.isDone()) { if (deferred.promise.isDone()) {
@ -277,71 +277,73 @@ class Poller(private val configFactory: ConfigFactoryProtocol, debounceTimer: Ti
} }
} }
SnodeAPI.getRawBatchResponse(snode, userPublicKey, requests).bind { rawResponses -> if (requests.isNotEmpty()) {
isCaughtUp = true SnodeAPI.getRawBatchResponse(snode, userPublicKey, requests).bind { rawResponses ->
if (deferred.promise.isDone()) { isCaughtUp = true
return@bind Promise.ofSuccess(Unit) if (deferred.promise.isDone()) {
} else { return@bind Promise.ofSuccess(Unit)
val responseList = (rawResponses["results"] as List<RawResponse>) } else {
// in case we had null configs, the array won't be fully populated val responseList = (rawResponses["results"] as List<RawResponse>)
// index of the sparse array key iterator should be the request index, with the key being the namespace // in case we had null configs, the array won't be fully populated
listOfNotNull( // index of the sparse array key iterator should be the request index, with the key being the namespace
configFactory.user?.configNamespace(), listOfNotNull(
configFactory.contacts?.configNamespace(), configFactory.user?.configNamespace(),
configFactory.userGroups?.configNamespace(), configFactory.contacts?.configNamespace(),
configFactory.convoVolatile?.configNamespace() configFactory.userGroups?.configNamespace(),
).map { configFactory.convoVolatile?.configNamespace()
it to requestSparseArray.indexOfKey(it) ).map {
}.filter { (_, i) -> i >= 0 }.forEach { (key, requestIndex) -> it to requestSparseArray.indexOfKey(it)
responseList.getOrNull(requestIndex)?.let { rawResponse -> }.filter { (_, i) -> i >= 0 }.forEach { (key, requestIndex) ->
if (rawResponse["code"] as? Int != 200) { responseList.getOrNull(requestIndex)?.let { rawResponse ->
Log.e("Loki", "Batch sub-request had non-200 response code, returned code ${(rawResponse["code"] as? Int) ?: "[unknown]"}") if (rawResponse["code"] as? Int != 200) {
return@forEach Log.e("Loki", "Batch sub-request had non-200 response code, returned code ${(rawResponse["code"] as? Int) ?: "[unknown]"}")
} return@forEach
val body = rawResponse["body"] as? RawResponse
if (body == null) {
Log.e("Loki", "Batch sub-request didn't contain a body")
return@forEach
}
if (key == Namespace.DEFAULT) {
return@forEach // continue, skip default namespace
} else {
when (ConfigBase.kindFor(key)) {
UserProfile::class.java -> processConfig(snode, body, key, configFactory.user)
Contacts::class.java -> processConfig(snode, body, key, configFactory.contacts)
ConversationVolatileConfig::class.java -> processConfig(snode, body, key, configFactory.convoVolatile)
UserGroupsConfig::class.java -> processConfig(snode, body, key, configFactory.userGroups)
} }
}
}
}
// the first response will be the personal messages (we want these to be processed after config messages)
val personalResponseIndex = requestSparseArray.indexOfKey(Namespace.DEFAULT)
if (personalResponseIndex >= 0) {
responseList.getOrNull(personalResponseIndex)?.let { rawResponse ->
if (rawResponse["code"] as? Int != 200) {
Log.e("Loki", "Batch sub-request for personal messages had non-200 response code, returned code ${(rawResponse["code"] as? Int) ?: "[unknown]"}")
// If we got a non-success response then the snode might be bad so we should try rotate
// to a different one just in case
pollNextSnode(deferred = deferred)
return@bind Promise.ofSuccess(Unit)
} else {
val body = rawResponse["body"] as? RawResponse val body = rawResponse["body"] as? RawResponse
if (body == null) { if (body == null) {
Log.e("Loki", "Batch sub-request for personal messages didn't contain a body") Log.e("Loki", "Batch sub-request didn't contain a body")
return@forEach
}
if (key == Namespace.DEFAULT) {
return@forEach // continue, skip default namespace
} else { } else {
processPersonalMessages(snode, body) when (ConfigBase.kindFor(key)) {
UserProfile::class.java -> processConfig(snode, body, key, configFactory.user)
Contacts::class.java -> processConfig(snode, body, key, configFactory.contacts)
ConversationVolatileConfig::class.java -> processConfig(snode, body, key, configFactory.convoVolatile)
UserGroupsConfig::class.java -> processConfig(snode, body, key, configFactory.userGroups)
}
} }
} }
} }
}
// the first response will be the personal messages (we want these to be processed after config messages)
val personalResponseIndex = requestSparseArray.indexOfKey(Namespace.DEFAULT)
if (personalResponseIndex >= 0) {
responseList.getOrNull(personalResponseIndex)?.let { rawResponse ->
if (rawResponse["code"] as? Int != 200) {
Log.e("Loki", "Batch sub-request for personal messages had non-200 response code, returned code ${(rawResponse["code"] as? Int) ?: "[unknown]"}")
// If we got a non-success response then the snode might be bad so we should try rotate
// to a different one just in case
pollNextSnode(deferred = deferred)
return@bind Promise.ofSuccess(Unit)
} else {
val body = rawResponse["body"] as? RawResponse
if (body == null) {
Log.e("Loki", "Batch sub-request for personal messages didn't contain a body")
} else {
processPersonalMessages(snode, body)
}
}
}
}
poll(snode, deferred)
}
}.fail {
Log.e("Loki", "Failed to get raw batch response", it)
poll(snode, deferred) poll(snode, deferred)
} }
}.fail {
Log.e("Loki", "Failed to get raw batch response", it)
poll(snode, deferred)
} }
} }
} }