store server hash value for incoming messages

This commit is contained in:
ryanzhao
2021-08-17 14:34:49 +10:00
parent c4a3463416
commit ecc881bc7c
7 changed files with 21 additions and 12 deletions

View File

@@ -7,7 +7,7 @@ import org.session.libsession.messaging.sending_receiving.handle
import org.session.libsession.messaging.utilities.Data
import org.session.libsignal.utilities.Log
class MessageReceiveJob(val data: ByteArray, val openGroupMessageServerID: Long? = null, val openGroupID: String? = null) : Job {
class MessageReceiveJob(val data: ByteArray, val serverHash: String? = null, val openGroupMessageServerID: Long? = null, val openGroupID: String? = null) : Job {
override var delegate: JobDelegate? = null
override var id: String? = null
override var failureCount: Int = 0
@@ -21,6 +21,7 @@ class MessageReceiveJob(val data: ByteArray, val openGroupMessageServerID: Long?
// Keys used for database storage
private val DATA_KEY = "data"
private val SERVER_HASH_KEY = "serverHash"
private val OPEN_GROUP_MESSAGE_SERVER_ID_KEY = "openGroupMessageServerID"
private val OPEN_GROUP_ID_KEY = "open_group_id"
}
@@ -34,6 +35,7 @@ class MessageReceiveJob(val data: ByteArray, val openGroupMessageServerID: Long?
try {
val isRetry: Boolean = failureCount != 0
val (message, proto) = MessageReceiver.parse(this.data, this.openGroupMessageServerID)
message.serverHash = serverHash
synchronized(RECEIVE_LOCK) { // FIXME: Do we need this?
MessageReceiver.handle(message, proto, this.openGroupID)
}
@@ -67,6 +69,7 @@ class MessageReceiveJob(val data: ByteArray, val openGroupMessageServerID: Long?
override fun serialize(): Data {
val builder = Data.Builder().putByteArray(DATA_KEY, data)
serverHash?.let { builder.putString(SERVER_HASH_KEY, it) }
openGroupMessageServerID?.let { builder.putLong(OPEN_GROUP_MESSAGE_SERVER_ID_KEY, it) }
openGroupID?.let { builder.putString(OPEN_GROUP_ID_KEY, it) }
return builder.build();
@@ -81,6 +84,7 @@ class MessageReceiveJob(val data: ByteArray, val openGroupMessageServerID: Long?
override fun create(data: Data): MessageReceiveJob {
return MessageReceiveJob(
data.getByteArray(DATA_KEY),
data.getString(SERVER_HASH_KEY),
data.getLong(OPEN_GROUP_MESSAGE_SERVER_ID_KEY),
data.getString(OPEN_GROUP_ID_KEY)
)

View File

@@ -102,8 +102,8 @@ class ClosedGroupPollerV2 {
}
promise.success { envelopes ->
if (!isPolling(groupPublicKey)) { return@success }
envelopes.forEach { envelope ->
val job = MessageReceiveJob(envelope.toByteArray())
envelopes.forEach { (envelope, serverHash) ->
val job = MessageReceiveJob(envelope.toByteArray(), serverHash)
JobQueue.shared.add(job)
}
}

View File

@@ -91,8 +91,8 @@ class Poller {
task { Unit } // The long polling connection has been canceled; don't recurse
} else {
val messages = SnodeAPI.parseRawMessagesResponse(rawResponse, snode, userPublicKey)
messages.forEach { envelope ->
val job = MessageReceiveJob(envelope.toByteArray())
messages.forEach { (envelope, serverHash) ->
val job = MessageReceiveJob(envelope.toByteArray(), serverHash)
JobQueue.shared.add(job)
}
poll(snode, deferred)

View File

@@ -426,7 +426,7 @@ object SnodeAPI {
}
}
fun parseRawMessagesResponse(rawResponse: RawResponse, snode: Snode, publicKey: String): List<SignalServiceProtos.Envelope> {
fun parseRawMessagesResponse(rawResponse: RawResponse, snode: Snode, publicKey: String): List<Pair<SignalServiceProtos.Envelope, String?>> {
val messages = rawResponse["messages"] as? List<*>
return if (messages != null) {
updateLastMessageHashValueIfPossible(snode, publicKey, messages)
@@ -465,14 +465,14 @@ object SnodeAPI {
return result
}
private fun parseEnvelopes(rawMessages: List<*>): List<SignalServiceProtos.Envelope> {
private fun parseEnvelopes(rawMessages: List<*>): List<Pair<SignalServiceProtos.Envelope, String?>> {
return rawMessages.mapNotNull { rawMessage ->
val rawMessageAsJSON = rawMessage as? Map<*, *>
val base64EncodedData = rawMessageAsJSON?.get("data") as? String
val data = base64EncodedData?.let { Base64.decode(it) }
if (data != null) {
try {
MessageWrapper.unwrap(data)
Pair(MessageWrapper.unwrap(data), rawMessageAsJSON.get("hash") as? String)
} catch (e: Exception) {
Log.d("Loki", "Failed to unwrap data for message: ${rawMessage.prettifiedDescription()}.")
null
@@ -568,5 +568,5 @@ object SnodeAPI {
// Type Aliases
typealias RawResponse = Map<*, *>
typealias MessageListPromise = Promise<List<SignalServiceProtos.Envelope>, Exception>
typealias MessageListPromise = Promise<List<Pair<SignalServiceProtos.Envelope, String?>>, Exception>
typealias RawResponsePromise = Promise<RawResponse, Exception>