feat: syncs the user profile stuff for now, and errors back to placeholder instead of unknown recipient

This commit is contained in:
0x330a
2023-02-13 16:43:45 +11:00
parent fb21f58cbd
commit 03a343d832
8 changed files with 77 additions and 33 deletions

View File

@@ -85,7 +85,10 @@ data class ConfigurationSyncJob(val destination: Destination): Job {
val allRequests = mutableListOf<SnodeAPI.SnodeBatchRequestInfo>()
allRequests += batchObjects.requireNoNulls().map { (_, request) -> request }
// add in the deletion if we have any hashes
if (toDeleteRequest != null) allRequests += toDeleteRequest
if (toDeleteRequest != null) {
allRequests += toDeleteRequest
Log.d(TAG, "Including delete request for current hashes")
}
val batchResponse = SnodeAPI.getSingleTargetSnode(destination.destinationPublicKey()).bind { snode ->
SnodeAPI.getRawBatchResponse(
@@ -104,17 +107,29 @@ data class ConfigurationSyncJob(val destination: Destination): Job {
val deletionResponse = if (toDeleteRequest != null) responseList.last() else null
val deletedHashes = deletionResponse?.let {
@Suppress("UNCHECKED_CAST")
deletionResponse["deleted"] as? List<String>
}?.toSet() ?: emptySet()
// get the sub-request body
(deletionResponse["body"] as? RawResponse)?.let { body ->
// get the swarm dict
body["swarm"] as? RawResponse
}?.mapValues { (_, swarmDict) ->
// get the deleted values from dict
((swarmDict as? RawResponse)?.get("deleted") as? List<String>)?.toSet() ?: emptySet()
}?.values?.reduce { acc, strings ->
// create an intersection of all deleted hashes (common between all swarm nodes)
acc intersect strings
}
} ?: emptySet()
// at this point responseList index should line up with configsRequiringPush index
configsRequiringPush.forEachIndexed { index, config ->
val (toPushMessage, _) = batchObjects[index]!!
val response = responseList[index]
val insertHash = response["hash"] as? String ?: run {
val responseBody = response["body"] as? RawResponse
val insertHash = responseBody?.get("hash") as? String ?: run {
Log.w(TAG, "No hash returned for the configuration in namespace ${config.configNamespace()}")
return@forEachIndexed
}
Log.d(TAG, "Hash $insertHash returned from store request for new config")
// confirm pushed seqno
val thisSeqNo = toPushMessage.seqNo
@@ -129,7 +144,8 @@ data class ConfigurationSyncJob(val destination: Destination): Job {
configFactory.persist(config)
}
} catch (e: Exception) {
Log.e(TAG, "Error performing batch request")
Log.e(TAG, "Error performing batch request", e)
return delegate.handleJobFailedPermanently(this, e)
}
delegate.handleJobSucceeded(this)
}

View File

@@ -115,7 +115,7 @@ class JobQueue : JobDelegate {
while (isActive) {
when (val job = queue.receive()) {
is NotifyPNServerJob, is AttachmentUploadJob, is MessageSendJob -> {
is NotifyPNServerJob, is AttachmentUploadJob, is MessageSendJob, is ConfigurationSyncJob -> {
txQueue.send(job)
}
is AttachmentDownloadJob -> {

View File

@@ -26,7 +26,6 @@ import org.session.libsession.snode.RawResponse
import org.session.libsession.snode.SnodeAPI
import org.session.libsession.snode.SnodeModule
import org.session.libsession.utilities.ConfigFactoryProtocol
import org.session.libsignal.utilities.JsonUtil
import org.session.libsignal.utilities.Log
import org.session.libsignal.utilities.Namespace
import org.session.libsignal.utilities.Snode
@@ -133,8 +132,18 @@ class Poller(private val configFactory: ConfigFactoryProtocol) {
snode,
userPublicKey,
namespace,
updateLatestHash = false
)
updateLatestHash = false,
updateStoredHashes = false,
).filter { (_, hash) -> !configFactory.getHashesFor(forConfigObject).contains(hash) }
if (messages.isEmpty()) {
// no new messages to process
return
}
Log.d("Loki-DBG", "Received configs with hashes: ${messages.map { it.second }}")
Log.d("Loki-DBG", "Hashes we have for config: ${configFactory.getHashesFor(forConfigObject)}")
messages.forEach { (envelope, hash) ->
try {
val (message, _) = MessageReceiver.parse(data = envelope.toByteArray(), openGroupServerID = null)
@@ -143,7 +152,6 @@ class Poller(private val configFactory: ConfigFactoryProtocol) {
Log.w("Loki-DBG", "shared config message handled in configs wasn't SharedConfigurationMessage but was ${message.javaClass.simpleName}")
return@forEach
}
// maybe do something with seqNo ?
Log.d("Loki-DBG", "Merging config of kind ${message.kind} into ${forConfigObject.javaClass.simpleName}")
forConfigObject.merge(message.data)
configFactory.appendHash(forConfigObject, hash!!)
@@ -184,20 +192,27 @@ class Poller(private val configFactory: ConfigFactoryProtocol) {
if (deferred.promise.isDone()) {
return@bind Promise.ofSuccess(Unit)
} else {
// TODO: remove log after testing responses
Log.d("Loki-DBG", JsonUtil.toJson(rawResponses))
val requestList = (rawResponses["results"] as List<RawResponse>)
// in case we had null configs, the array won't be fully populated
// index of the sparse array key iterator should be the request index, with the key being the namespace
requestSparseArray.keyIterator().withIndex().forEach { (requestIndex, key) ->
requestList.getOrNull(requestIndex)?.let { rawResponse ->
if (rawResponse["code"] as? Int != 200) {
Log.e("Loki-DBG", "Batch sub-request had non-200 response code, returned code ${(rawResponse["code"] as? Int) ?: "[unknown]"}")
return@forEach
}
val body = rawResponse["body"] as? RawResponse
if (body == null) {
Log.e("Loki-DBG", "Batch sub-request didn't contain a body")
return@forEach
}
if (key == Namespace.DEFAULT) {
processPersonalMessages(snode, rawResponse)
processPersonalMessages(snode, body)
} else {
when (ConfigBase.kindFor(key)) {
UserProfile::class.java -> processConfig(snode, rawResponse, key, configFactory.user)
Contacts::class.java -> processConfig(snode, rawResponse, key, configFactory.contacts)
ConversationVolatileConfig::class.java -> processConfig(snode, rawResponse, key, configFactory.convoVolatile)
UserProfile::class.java -> processConfig(snode, body, key, configFactory.user)
Contacts::class.java -> processConfig(snode, body, key, configFactory.contacts)
ConversationVolatileConfig::class.java -> processConfig(snode, body, key, configFactory.convoVolatile)
}
}
}

View File

@@ -442,7 +442,7 @@ object SnodeAPI {
params["pubkey_ed25519"] = ed25519PublicKey
params["signature"] = Base64.encodeBytes(signature)
return SnodeBatchRequestInfo(
Snode.Method.Retrieve.rawValue,
Snode.Method.DeleteMessage.rawValue,
params,
null
)
@@ -711,13 +711,13 @@ object SnodeAPI {
}
}
fun parseRawMessagesResponse(rawResponse: RawResponse, snode: Snode, publicKey: String, namespace: Int = 0, updateLatestHash: Boolean = true): List<Pair<SignalServiceProtos.Envelope, String?>> {
fun parseRawMessagesResponse(rawResponse: RawResponse, snode: Snode, publicKey: String, namespace: Int = 0, updateLatestHash: Boolean = true, updateStoredHashes: Boolean = true): List<Pair<SignalServiceProtos.Envelope, String?>> {
val messages = rawResponse["messages"] as? List<*>
return if (messages != null) {
if (updateLatestHash) {
updateLastMessageHashValueIfPossible(snode, publicKey, messages, namespace)
}
val newRawMessages = removeDuplicates(publicKey, messages, namespace)
val newRawMessages = removeDuplicates(publicKey, messages, namespace, updateStoredHashes)
return parseEnvelopes(newRawMessages)
} else {
listOf()
@@ -734,7 +734,7 @@ object SnodeAPI {
}
}
private fun removeDuplicates(publicKey: String, rawMessages: List<*>, namespace: Int): List<*> {
private fun removeDuplicates(publicKey: String, rawMessages: List<*>, namespace: Int, updateStoredHashes: Boolean): List<*> {
val originalMessageHashValues = database.getReceivedMessageHashValues(publicKey, namespace)?.toMutableSet() ?: mutableSetOf()
val receivedMessageHashValues = originalMessageHashValues.toMutableSet()
val result = rawMessages.filter { rawMessage ->
@@ -749,7 +749,7 @@ object SnodeAPI {
false
}
}
if (originalMessageHashValues != receivedMessageHashValues) {
if (originalMessageHashValues != receivedMessageHashValues && updateStoredHashes) {
database.setReceivedMessageHashValues(publicKey, receivedMessageHashValues, namespace)
}
return result