diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/ConfigurationSyncJob.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/ConfigurationSyncJob.kt index b57d59f7ce..a9b454e3b2 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/ConfigurationSyncJob.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/ConfigurationSyncJob.kt @@ -2,11 +2,13 @@ package org.session.libsession.messaging.jobs import network.loki.messenger.libsession_util.ConfigBase import network.loki.messenger.libsession_util.ConfigBase.Companion.protoKindFor +import nl.komponents.kovenant.functional.bind import org.session.libsession.messaging.MessagingModuleConfiguration import org.session.libsession.messaging.messages.Destination import org.session.libsession.messaging.messages.control.SharedConfigurationMessage import org.session.libsession.messaging.sending_receiving.MessageSender import org.session.libsession.messaging.utilities.Data +import org.session.libsession.snode.RawResponse import org.session.libsession.snode.SnodeAPI import org.session.libsession.utilities.TextSecurePreferences import org.session.libsignal.utilities.Log @@ -91,61 +93,60 @@ data class ConfigurationSyncJob(val destination: Destination): Job { Log.d(TAG, "Including delete request for current hashes") } - // TODO: re-add all this to do actual network sync job -// val batchResponse = SnodeAPI.getSingleTargetSnode(destination.destinationPublicKey()).bind { snode -> -// SnodeAPI.getRawBatchResponse( -// snode, -// destination.destinationPublicKey(), -// allRequests, -// sequence = true -// ) -// } -// -// try { -// val rawResponses = batchResponse.get() -// @Suppress("UNCHECKED_CAST") -// val responseList = (rawResponses["results"] as List) -// // we are always adding in deletions at the end -// val deletionResponse = if (toDeleteRequest != null) responseList.last() else null -// val deletedHashes = deletionResponse?.let { -// @Suppress("UNCHECKED_CAST") -// // get the sub-request body -// (deletionResponse["body"] as? RawResponse)?.let { body -> -// // get the swarm dict -// body["swarm"] as? RawResponse -// }?.mapValues { (_, swarmDict) -> -// // get the deleted values from dict -// ((swarmDict as? RawResponse)?.get("deleted") as? List)?.toSet() ?: emptySet() -// }?.values?.reduce { acc, strings -> -// // create an intersection of all deleted hashes (common between all swarm nodes) -// acc intersect strings -// } -// } ?: emptySet() -// -// // at this point responseList index should line up with configsRequiringPush index -// configsRequiringPush.forEachIndexed { index, config -> -// val (toPushMessage, _) = batchObjects[index]!! -// val response = responseList[index] -// val responseBody = response["body"] as? RawResponse -// val insertHash = responseBody?.get("hash") as? String ?: run { -// Log.w(TAG, "No hash returned for the configuration in namespace ${config.configNamespace()}") -// return@forEachIndexed -// } -// Log.d(TAG, "Hash $insertHash returned from store request for new config") -// -// // confirm pushed seqno -// val thisSeqNo = toPushMessage.seqNo -// config.confirmPushed(thisSeqNo, insertHash) -// Log.d(TAG, "Successfully removed the deleted hashes from ${config.javaClass.simpleName}") -// // dump and write config after successful -// if (config.needsDump()) { // usually this will be true? -// configFactory.persist(config) -// } -// } -// } catch (e: Exception) { -// Log.e(TAG, "Error performing batch request", e) -// return delegate.handleJobFailed(this, dispatcherName, e) -// } + val batchResponse = SnodeAPI.getSingleTargetSnode(destination.destinationPublicKey()).bind { snode -> + SnodeAPI.getRawBatchResponse( + snode, + destination.destinationPublicKey(), + allRequests, + sequence = true + ) + } + + try { + val rawResponses = batchResponse.get() + @Suppress("UNCHECKED_CAST") + val responseList = (rawResponses["results"] as List) + // we are always adding in deletions at the end + val deletionResponse = if (toDeleteRequest != null) responseList.last() else null + val deletedHashes = deletionResponse?.let { + @Suppress("UNCHECKED_CAST") + // get the sub-request body + (deletionResponse["body"] as? RawResponse)?.let { body -> + // get the swarm dict + body["swarm"] as? RawResponse + }?.mapValues { (_, swarmDict) -> + // get the deleted values from dict + ((swarmDict as? RawResponse)?.get("deleted") as? List)?.toSet() ?: emptySet() + }?.values?.reduce { acc, strings -> + // create an intersection of all deleted hashes (common between all swarm nodes) + acc intersect strings + } + } ?: emptySet() + + // at this point responseList index should line up with configsRequiringPush index + configsRequiringPush.forEachIndexed { index, config -> + val (toPushMessage, _) = batchObjects[index]!! + val response = responseList[index] + val responseBody = response["body"] as? RawResponse + val insertHash = responseBody?.get("hash") as? String ?: run { + Log.w(TAG, "No hash returned for the configuration in namespace ${config.configNamespace()}") + return@forEachIndexed + } + Log.d(TAG, "Hash $insertHash returned from store request for new config") + + // confirm pushed seqno + val thisSeqNo = toPushMessage.seqNo + config.confirmPushed(thisSeqNo, insertHash) + Log.d(TAG, "Successfully removed the deleted hashes from ${config.javaClass.simpleName}") + // dump and write config after successful + if (config.needsDump()) { // usually this will be true? + configFactory.persist(config) + } + } + } catch (e: Exception) { + Log.e(TAG, "Error performing batch request", e) + return delegate.handleJobFailed(this, dispatcherName, e) + } delegate.handleJobSucceeded(this, dispatcherName) if (shouldRunAgain.get() && storage.getConfigSyncJob(destination) == null) { // reschedule if something has updated since we started this job