mirror of
https://github.com/oxen-io/session-android.git
synced 2024-11-30 13:35:18 +00:00
[SES-2162] - Remove wrapping of config message (#1517)
* Remove wrapping of config message * Addresses feedback * Merged in ThreadUtils fix * JDK installation * Revert JDK change --------- Co-authored-by: fanchao <git@fanchao.dev>
This commit is contained in:
parent
a0e6167718
commit
0f47076192
@ -5,7 +5,6 @@ import kotlinx.coroutines.asExecutor
|
|||||||
import nl.komponents.kovenant.Kovenant
|
import nl.komponents.kovenant.Kovenant
|
||||||
import nl.komponents.kovenant.jvm.asDispatcher
|
import nl.komponents.kovenant.jvm.asDispatcher
|
||||||
import org.session.libsignal.utilities.Log
|
import org.session.libsignal.utilities.Log
|
||||||
import org.session.libsignal.utilities.ThreadUtils
|
|
||||||
import java.util.concurrent.Executors
|
import java.util.concurrent.Executors
|
||||||
|
|
||||||
object AppContext {
|
object AppContext {
|
||||||
|
@ -1 +1 @@
|
|||||||
Subproject commit 626b6628a2af8fff798042416b3b469b8bfc6ecf
|
Subproject commit 4b6f595fdbd3b5f6fba380253e560d8ee296b734
|
@ -61,7 +61,7 @@ data class ConfigurationSyncJob(val destination: Destination): Job {
|
|||||||
SharedConfigurationMessage(config.protoKindFor(), data, seqNo) to config
|
SharedConfigurationMessage(config.protoKindFor(), data, seqNo) to config
|
||||||
}.map { (message, config) ->
|
}.map { (message, config) ->
|
||||||
// return a list of batch request objects
|
// return a list of batch request objects
|
||||||
val snodeMessage = MessageSender.buildWrappedMessageToSnode(destination, message, true)
|
val snodeMessage = MessageSender.buildConfigMessageToSnode(destination.destinationPublicKey(), message)
|
||||||
val authenticated = SnodeAPI.buildAuthenticatedStoreBatchInfo(
|
val authenticated = SnodeAPI.buildAuthenticatedStoreBatchInfo(
|
||||||
destination.destinationPublicKey(),
|
destination.destinationPublicKey(),
|
||||||
config.configNamespace(),
|
config.configNamespace(),
|
||||||
|
@ -81,6 +81,15 @@ object MessageSender {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fun buildConfigMessageToSnode(destinationPubKey: String, message: SharedConfigurationMessage): SnodeMessage {
|
||||||
|
return SnodeMessage(
|
||||||
|
destinationPubKey,
|
||||||
|
Base64.encodeBytes(message.data),
|
||||||
|
ttl = message.ttl,
|
||||||
|
SnodeAPI.nowWithOffset
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
// One-on-One Chats & Closed Groups
|
// One-on-One Chats & Closed Groups
|
||||||
@Throws(Exception::class)
|
@Throws(Exception::class)
|
||||||
fun buildWrappedMessageToSnode(destination: Destination, message: Message, isSyncMessage: Boolean): SnodeMessage {
|
fun buildWrappedMessageToSnode(destination: Destination, message: Message, isSyncMessage: Boolean): SnodeMessage {
|
||||||
|
@ -25,6 +25,7 @@ import org.session.libsession.snode.RawResponse
|
|||||||
import org.session.libsession.snode.SnodeAPI
|
import org.session.libsession.snode.SnodeAPI
|
||||||
import org.session.libsession.snode.SnodeModule
|
import org.session.libsession.snode.SnodeModule
|
||||||
import org.session.libsession.utilities.ConfigFactoryProtocol
|
import org.session.libsession.utilities.ConfigFactoryProtocol
|
||||||
|
import org.session.libsignal.utilities.Base64
|
||||||
import org.session.libsignal.utilities.Log
|
import org.session.libsignal.utilities.Log
|
||||||
import org.session.libsignal.utilities.Namespace
|
import org.session.libsignal.utilities.Namespace
|
||||||
import org.session.libsignal.utilities.Snode
|
import org.session.libsignal.utilities.Snode
|
||||||
@ -126,37 +127,26 @@ class Poller(private val configFactory: ConfigFactoryProtocol, debounceTimer: Ti
|
|||||||
private fun processConfig(snode: Snode, rawMessages: RawResponse, namespace: Int, forConfigObject: ConfigBase?) {
|
private fun processConfig(snode: Snode, rawMessages: RawResponse, namespace: Int, forConfigObject: ConfigBase?) {
|
||||||
if (forConfigObject == null) return
|
if (forConfigObject == null) return
|
||||||
|
|
||||||
val messages = SnodeAPI.parseRawMessagesResponse(
|
val messages = rawMessages["messages"] as? List<*>
|
||||||
rawMessages,
|
val processed = if (!messages.isNullOrEmpty()) {
|
||||||
snode,
|
SnodeAPI.updateLastMessageHashValueIfPossible(snode, userPublicKey, messages, namespace)
|
||||||
userPublicKey,
|
SnodeAPI.removeDuplicates(userPublicKey, messages, namespace, true).mapNotNull { messageBody ->
|
||||||
namespace,
|
val rawMessageAsJSON = messageBody as? Map<*, *> ?: return@mapNotNull null
|
||||||
updateLatestHash = true,
|
val hashValue = rawMessageAsJSON["hash"] as? String ?: return@mapNotNull null
|
||||||
updateStoredHashes = true,
|
val b64EncodedBody = rawMessageAsJSON["data"] as? String ?: return@mapNotNull null
|
||||||
)
|
val timestamp = rawMessageAsJSON["t"] as? Long ?: SnodeAPI.nowWithOffset
|
||||||
|
val body = Base64.decode(b64EncodedBody)
|
||||||
|
Triple(body, hashValue, timestamp)
|
||||||
|
}
|
||||||
|
} else emptyList()
|
||||||
|
|
||||||
if (messages.isEmpty()) {
|
if (processed.isEmpty()) return
|
||||||
// no new messages to process
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
var latestMessageTimestamp: Long? = null
|
var latestMessageTimestamp: Long? = null
|
||||||
messages.forEach { (envelope, hash) ->
|
processed.forEach { (body, hash, timestamp) ->
|
||||||
try {
|
try {
|
||||||
val (message, _) = MessageReceiver.parse(data = envelope.toByteArray(),
|
forConfigObject.merge(hash to body)
|
||||||
// assume no groups in personal poller messages
|
latestMessageTimestamp = if (timestamp > (latestMessageTimestamp ?: 0L)) { timestamp } else { latestMessageTimestamp }
|
||||||
openGroupServerID = null, currentClosedGroups = emptySet()
|
|
||||||
)
|
|
||||||
// sanity checks
|
|
||||||
if (message !is SharedConfigurationMessage) {
|
|
||||||
Log.w("Loki", "shared config message handled in configs wasn't SharedConfigurationMessage but was ${message.javaClass.simpleName}")
|
|
||||||
return@forEach
|
|
||||||
}
|
|
||||||
val merged = forConfigObject.merge(hash!! to message.data).firstOrNull { it == hash }
|
|
||||||
if (merged != null) {
|
|
||||||
// We successfully merged the hash, we can now update the timestamp
|
|
||||||
latestMessageTimestamp = if ((message.sentTimestamp ?: 0L) > (latestMessageTimestamp ?: 0L)) { message.sentTimestamp } else { latestMessageTimestamp }
|
|
||||||
}
|
|
||||||
} catch (e: Exception) {
|
} catch (e: Exception) {
|
||||||
Log.e("Loki", e)
|
Log.e("Loki", e)
|
||||||
}
|
}
|
||||||
|
@ -829,7 +829,7 @@ object SnodeAPI {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun updateLastMessageHashValueIfPossible(snode: Snode, publicKey: String, rawMessages: List<*>, namespace: Int) {
|
fun updateLastMessageHashValueIfPossible(snode: Snode, publicKey: String, rawMessages: List<*>, namespace: Int) {
|
||||||
val lastMessageAsJSON = rawMessages.lastOrNull() as? Map<*, *>
|
val lastMessageAsJSON = rawMessages.lastOrNull() as? Map<*, *>
|
||||||
val hashValue = lastMessageAsJSON?.get("hash") as? String
|
val hashValue = lastMessageAsJSON?.get("hash") as? String
|
||||||
if (hashValue != null) {
|
if (hashValue != null) {
|
||||||
@ -839,7 +839,7 @@ object SnodeAPI {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun removeDuplicates(publicKey: String, rawMessages: List<*>, namespace: Int, updateStoredHashes: Boolean): List<*> {
|
fun removeDuplicates(publicKey: String, rawMessages: List<*>, namespace: Int, updateStoredHashes: Boolean): List<*> {
|
||||||
val originalMessageHashValues = database.getReceivedMessageHashValues(publicKey, namespace)?.toMutableSet() ?: mutableSetOf()
|
val originalMessageHashValues = database.getReceivedMessageHashValues(publicKey, namespace)?.toMutableSet() ?: mutableSetOf()
|
||||||
val receivedMessageHashValues = originalMessageHashValues.toMutableSet()
|
val receivedMessageHashValues = originalMessageHashValues.toMutableSet()
|
||||||
val result = rawMessages.filter { rawMessage ->
|
val result = rawMessages.filter { rawMessage ->
|
||||||
|
Loading…
Reference in New Issue
Block a user