refactor: re-add the sending of configs

This commit is contained in:
0x330a 2023-05-22 15:51:05 +10:00
parent 9c206bad64
commit 1a3f432f85
No known key found for this signature in database
GPG Key ID: 267811D6E6A2698C

View File

@ -2,11 +2,13 @@ package org.session.libsession.messaging.jobs
import network.loki.messenger.libsession_util.ConfigBase
import network.loki.messenger.libsession_util.ConfigBase.Companion.protoKindFor
import nl.komponents.kovenant.functional.bind
import org.session.libsession.messaging.MessagingModuleConfiguration
import org.session.libsession.messaging.messages.Destination
import org.session.libsession.messaging.messages.control.SharedConfigurationMessage
import org.session.libsession.messaging.sending_receiving.MessageSender
import org.session.libsession.messaging.utilities.Data
import org.session.libsession.snode.RawResponse
import org.session.libsession.snode.SnodeAPI
import org.session.libsession.utilities.TextSecurePreferences
import org.session.libsignal.utilities.Log
@ -91,61 +93,60 @@ data class ConfigurationSyncJob(val destination: Destination): Job {
Log.d(TAG, "Including delete request for current hashes")
}
// TODO: re-add all this to do actual network sync job
// val batchResponse = SnodeAPI.getSingleTargetSnode(destination.destinationPublicKey()).bind { snode ->
// SnodeAPI.getRawBatchResponse(
// snode,
// destination.destinationPublicKey(),
// allRequests,
// sequence = true
// )
// }
//
// try {
// val rawResponses = batchResponse.get()
// @Suppress("UNCHECKED_CAST")
// val responseList = (rawResponses["results"] as List<RawResponse>)
// // we are always adding in deletions at the end
// val deletionResponse = if (toDeleteRequest != null) responseList.last() else null
// val deletedHashes = deletionResponse?.let {
// @Suppress("UNCHECKED_CAST")
// // get the sub-request body
// (deletionResponse["body"] as? RawResponse)?.let { body ->
// // get the swarm dict
// body["swarm"] as? RawResponse
// }?.mapValues { (_, swarmDict) ->
// // get the deleted values from dict
// ((swarmDict as? RawResponse)?.get("deleted") as? List<String>)?.toSet() ?: emptySet()
// }?.values?.reduce { acc, strings ->
// // create an intersection of all deleted hashes (common between all swarm nodes)
// acc intersect strings
// }
// } ?: emptySet()
//
// // at this point responseList index should line up with configsRequiringPush index
// configsRequiringPush.forEachIndexed { index, config ->
// val (toPushMessage, _) = batchObjects[index]!!
// val response = responseList[index]
// val responseBody = response["body"] as? RawResponse
// val insertHash = responseBody?.get("hash") as? String ?: run {
// Log.w(TAG, "No hash returned for the configuration in namespace ${config.configNamespace()}")
// return@forEachIndexed
// }
// Log.d(TAG, "Hash $insertHash returned from store request for new config")
//
// // confirm pushed seqno
// val thisSeqNo = toPushMessage.seqNo
// config.confirmPushed(thisSeqNo, insertHash)
// Log.d(TAG, "Successfully removed the deleted hashes from ${config.javaClass.simpleName}")
// // dump and write config after successful
// if (config.needsDump()) { // usually this will be true?
// configFactory.persist(config)
// }
// }
// } catch (e: Exception) {
// Log.e(TAG, "Error performing batch request", e)
// return delegate.handleJobFailed(this, dispatcherName, e)
// }
val batchResponse = SnodeAPI.getSingleTargetSnode(destination.destinationPublicKey()).bind { snode ->
SnodeAPI.getRawBatchResponse(
snode,
destination.destinationPublicKey(),
allRequests,
sequence = true
)
}
try {
val rawResponses = batchResponse.get()
@Suppress("UNCHECKED_CAST")
val responseList = (rawResponses["results"] as List<RawResponse>)
// we are always adding in deletions at the end
val deletionResponse = if (toDeleteRequest != null) responseList.last() else null
val deletedHashes = deletionResponse?.let {
@Suppress("UNCHECKED_CAST")
// get the sub-request body
(deletionResponse["body"] as? RawResponse)?.let { body ->
// get the swarm dict
body["swarm"] as? RawResponse
}?.mapValues { (_, swarmDict) ->
// get the deleted values from dict
((swarmDict as? RawResponse)?.get("deleted") as? List<String>)?.toSet() ?: emptySet()
}?.values?.reduce { acc, strings ->
// create an intersection of all deleted hashes (common between all swarm nodes)
acc intersect strings
}
} ?: emptySet()
// at this point responseList index should line up with configsRequiringPush index
configsRequiringPush.forEachIndexed { index, config ->
val (toPushMessage, _) = batchObjects[index]!!
val response = responseList[index]
val responseBody = response["body"] as? RawResponse
val insertHash = responseBody?.get("hash") as? String ?: run {
Log.w(TAG, "No hash returned for the configuration in namespace ${config.configNamespace()}")
return@forEachIndexed
}
Log.d(TAG, "Hash $insertHash returned from store request for new config")
// confirm pushed seqno
val thisSeqNo = toPushMessage.seqNo
config.confirmPushed(thisSeqNo, insertHash)
Log.d(TAG, "Successfully removed the deleted hashes from ${config.javaClass.simpleName}")
// dump and write config after successful
if (config.needsDump()) { // usually this will be true?
configFactory.persist(config)
}
}
} catch (e: Exception) {
Log.e(TAG, "Error performing batch request", e)
return delegate.handleJobFailed(this, dispatcherName, e)
}
delegate.handleJobSucceeded(this, dispatcherName)
if (shouldRunAgain.get() && storage.getConfigSyncJob(destination) == null) {
// reschedule if something has updated since we started this job