mirror of
https://github.com/oxen-io/session-android.git
synced 2024-11-21 23:15:23 +00:00
Optimise SnodesAPI
This commit is contained in:
parent
482f169df1
commit
c1d40cdbe7
@ -18,6 +18,8 @@ import nl.komponents.kovenant.task
|
||||
import org.session.libsession.messaging.MessagingModuleConfiguration
|
||||
import org.session.libsession.messaging.utilities.MessageWrapper
|
||||
import org.session.libsession.messaging.utilities.SodiumUtilities.sodium
|
||||
import org.session.libsession.utilities.buildMutableMap
|
||||
import org.session.libsession.utilities.mapValuesNotNull
|
||||
import org.session.libsession.utilities.toByteArray
|
||||
import org.session.libsignal.crypto.getRandomElement
|
||||
import org.session.libsignal.database.LokiAPIDatabaseProtocol
|
||||
@ -73,20 +75,18 @@ object SnodeAPI {
|
||||
// Use port 4433 if the API level can handle the network security configuration and enforce pinned certificates
|
||||
private val seedNodePort = if (Build.VERSION.SDK_INT < Build.VERSION_CODES.N) 443 else 4443
|
||||
|
||||
private const val snodeFailureThreshold = 3
|
||||
private const val useOnionRequests = true
|
||||
|
||||
private const val useTestnet = false
|
||||
|
||||
private val seedNodePool = if (useTestnet) {
|
||||
setOf( "http://public.loki.foundation:38157" )
|
||||
} else {
|
||||
setOf(
|
||||
"https://seed1.getsession.org:$seedNodePort",
|
||||
"https://seed2.getsession.org:$seedNodePort",
|
||||
"https://seed3.getsession.org:$seedNodePort",
|
||||
)
|
||||
}
|
||||
private val seedNodePool = if (useTestnet) setOf(
|
||||
"http://public.loki.foundation:38157"
|
||||
) else setOf(
|
||||
"https://seed1.getsession.org:$seedNodePort",
|
||||
"https://seed2.getsession.org:$seedNodePort",
|
||||
"https://seed3.getsession.org:$seedNodePort",
|
||||
)
|
||||
|
||||
private const val snodeFailureThreshold = 3
|
||||
private const val useOnionRequests = true
|
||||
|
||||
private const val KEY_IP = "public_ip"
|
||||
private const val KEY_PORT = "storage_port"
|
||||
@ -121,48 +121,45 @@ object SnodeAPI {
|
||||
parameters: Map<String, Any>,
|
||||
publicKey: String? = null,
|
||||
version: Version = Version.V3
|
||||
): RawResponsePromise = if (useOnionRequests) OnionRequestAPI.sendOnionRequest(method, parameters, snode, version, publicKey).map {
|
||||
val body = it.body ?: throw Error.Generic
|
||||
JsonUtil.fromJson(body, Map::class.java)
|
||||
} else task {
|
||||
val payload = mapOf( "method" to method.rawValue, "params" to parameters )
|
||||
try {
|
||||
val url = "${snode.address}:${snode.port}/storage_rpc/v1"
|
||||
val response = HTTP.execute(HTTP.Verb.POST, url, payload).toString()
|
||||
JsonUtil.fromJson(response, Map::class.java)
|
||||
} catch (exception: Exception) {
|
||||
(exception as? HTTP.HTTPRequestFailedException)?.run {
|
||||
handleSnodeError(statusCode, json, snode, publicKey)
|
||||
// TODO Check if we meant to throw the error returned by handleSnodeError
|
||||
throw exception
|
||||
): RawResponsePromise = when {
|
||||
useOnionRequests -> OnionRequestAPI.sendOnionRequest(method, parameters, snode, version, publicKey).map {
|
||||
JsonUtil.fromJson(it.body ?: throw Error.Generic, Map::class.java)
|
||||
}
|
||||
else -> task {
|
||||
HTTP.execute(
|
||||
HTTP.Verb.POST,
|
||||
url = "${snode.address}:${snode.port}/storage_rpc/v1",
|
||||
parameters = buildMap {
|
||||
this["method"] = method.rawValue
|
||||
this["params"] = parameters
|
||||
}
|
||||
Log.d("Loki", "Unhandled exception: $exception.")
|
||||
throw exception
|
||||
).toString().let {
|
||||
JsonUtil.fromJson(it, Map::class.java)
|
||||
}
|
||||
}.fail { e ->
|
||||
when (e) {
|
||||
is HTTP.HTTPRequestFailedException -> handleSnodeError(e.statusCode, e.json, snode, publicKey)
|
||||
else -> Log.d("Loki", "Unhandled exception: $e.")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private val GET_RANDOM_SNODE_PARAMS = buildMap<String, Any> {
|
||||
this["method"] = "get_n_service_nodes"
|
||||
this["params"] = buildMap {
|
||||
this["active_only"] = true
|
||||
this["fields"] = sequenceOf(KEY_IP, KEY_PORT, KEY_X25519, KEY_ED25519, KEY_VERSION).associateWith { true }
|
||||
}
|
||||
}
|
||||
|
||||
internal fun getRandomSnode(): Promise<Snode, Exception> =
|
||||
snodePool.takeIf { it.size >= minimumSnodePoolCount }?.let { Promise.of(it.getRandomElement()) } ?: task {
|
||||
snodePool.takeIf { it.size >= minimumSnodePoolCount }?.getRandomElement()?.let { Promise.of(it) } ?: task {
|
||||
val target = seedNodePool.random()
|
||||
val url = "$target/json_rpc"
|
||||
Log.d("Loki", "Populating snode pool using: $target.")
|
||||
val parameters = mapOf(
|
||||
"method" to "get_n_service_nodes",
|
||||
"params" to mapOf(
|
||||
"active_only" to true,
|
||||
"fields" to mapOf(
|
||||
KEY_IP to true, KEY_PORT to true,
|
||||
KEY_X25519 to true, KEY_ED25519 to true,
|
||||
KEY_VERSION to true
|
||||
)
|
||||
)
|
||||
)
|
||||
val response = HTTP.execute(HTTP.Verb.POST, url, parameters, useSeedNodeConnection = true)
|
||||
val json = try {
|
||||
JsonUtil.fromJson(response, Map::class.java)
|
||||
} catch (exception: Exception) {
|
||||
mapOf( "result" to response.toString())
|
||||
}
|
||||
val url = "$target/json_rpc"
|
||||
val response = HTTP.execute(HTTP.Verb.POST, url, GET_RANDOM_SNODE_PARAMS, useSeedNodeConnection = true)
|
||||
val json = runCatching { JsonUtil.fromJson(response, Map::class.java) }.getOrNull()
|
||||
?: buildMap { this["result"] = response.toString() }
|
||||
val intermediate = json["result"] as? Map<*, *> ?: throw Error.Generic
|
||||
.also { Log.d("Loki", "Failed to update snode pool, intermediate was null.") }
|
||||
val rawSnodes = intermediate["service_node_states"] as? List<*> ?: throw Error.Generic
|
||||
@ -180,7 +177,7 @@ object SnodeAPI {
|
||||
).also { if (it == null) Log.d("Loki", "Failed to parse: ${rawSnode.prettifiedDescription()}.") }
|
||||
}.toSet().also {
|
||||
Log.d("Loki", "Persisting snode pool to database.")
|
||||
this.snodePool = it
|
||||
snodePool = it
|
||||
}.runCatching { getRandomElement() }.onFailure {
|
||||
Log.d("Loki", "Got an empty snode pool from: $target.")
|
||||
throw SnodeAPI.Error.Generic
|
||||
@ -220,10 +217,13 @@ object SnodeAPI {
|
||||
}
|
||||
val base64EncodedNameHash = Base64.encodeBytes(nameHash)
|
||||
// Ask 3 different snodes for the Account ID associated with the given name hash
|
||||
val parameters = mapOf(
|
||||
"endpoint" to "ons_resolve",
|
||||
"params" to mapOf( "type" to 0, "name_hash" to base64EncodedNameHash )
|
||||
)
|
||||
val parameters = buildMap<String, Any> {
|
||||
this["endpoint"] = "ons_resolve"
|
||||
this["params"] = buildMap {
|
||||
this["type"] = 0
|
||||
this["name_hash"] = base64EncodedNameHash
|
||||
}
|
||||
}
|
||||
val promises = List(validationCount) {
|
||||
getRandomSnode().bind { snode ->
|
||||
retryIfNeeded(maxRetryCount) {
|
||||
@ -232,10 +232,9 @@ object SnodeAPI {
|
||||
}
|
||||
}
|
||||
return all(promises).map { results ->
|
||||
val accountIDs = mutableListOf<String>()
|
||||
for (json in results) {
|
||||
val intermediate = json["result"] as? Map<*, *>
|
||||
val hexEncodedCiphertext = intermediate?.get("encrypted_value") as? String ?: throw Error.Generic
|
||||
results.map { json ->
|
||||
val intermediate = json["result"] as? Map<*, *> ?: throw Error.Generic
|
||||
val hexEncodedCiphertext = intermediate["encrypted_value"] as? String ?: throw Error.Generic
|
||||
val ciphertext = Hex.fromStringCondensed(hexEncodedCiphertext)
|
||||
val isArgon2Based = (intermediate["nonce"] == null)
|
||||
if (isArgon2Based) {
|
||||
@ -251,7 +250,7 @@ object SnodeAPI {
|
||||
if (!sodium.cryptoSecretBoxOpenEasy(accountIDAsData, ciphertext, ciphertext.size.toLong(), nonce, key)) {
|
||||
throw Error.DecryptionFailed
|
||||
}
|
||||
accountIDs.add(Hex.toStringCondensed(accountIDAsData))
|
||||
Hex.toStringCondensed(accountIDAsData)
|
||||
} else {
|
||||
val hexEncodedNonce = intermediate["nonce"] as? String ?: throw Error.Generic
|
||||
val nonce = Hex.fromStringCondensed(hexEncodedNonce)
|
||||
@ -263,10 +262,9 @@ object SnodeAPI {
|
||||
if (!sodium.cryptoAeadXChaCha20Poly1305IetfDecrypt(accountIDAsData, null, null, ciphertext, ciphertext.size.toLong(), null, 0, nonce, key)) {
|
||||
throw Error.DecryptionFailed
|
||||
}
|
||||
accountIDs.add(Hex.toStringCondensed(accountIDAsData))
|
||||
Hex.toStringCondensed(accountIDAsData)
|
||||
}
|
||||
}
|
||||
accountIDs.takeIf { it.size == validationCount && it.toSet().size == 1 }?.first()
|
||||
}.takeIf { it.size == validationCount && it.toSet().size == 1 }?.first()
|
||||
?: throw Error.ValidationFailed
|
||||
}
|
||||
}
|
||||
@ -274,30 +272,31 @@ object SnodeAPI {
|
||||
fun getSwarm(publicKey: String): Promise<Set<Snode>, Exception> =
|
||||
database.getSwarm(publicKey)?.takeIf { it.size >= minimumSwarmSnodeCount }?.let(Promise.Companion::of)
|
||||
?: getRandomSnode().bind {
|
||||
invoke(Snode.Method.GetSwarm, it, parameters = mapOf( "pubKey" to publicKey ), publicKey)
|
||||
invoke(Snode.Method.GetSwarm, it, parameters = buildMap { this["pubKey"] = publicKey }, publicKey)
|
||||
}.map {
|
||||
parseSnodes(it).toSet()
|
||||
}.success {
|
||||
database.setSwarm(publicKey, it)
|
||||
}
|
||||
|
||||
private fun signAndEncode(data: ByteArray, userED25519KeyPair: KeyPair) = sign(data, userED25519KeyPair).let(Base64::encodeBytes)
|
||||
private fun signAndEncodeCatching(data: ByteArray, userED25519KeyPair: KeyPair): Result<String> =
|
||||
runCatching { signAndEncode(data, userED25519KeyPair) }
|
||||
private fun signAndEncode(data: ByteArray, userED25519KeyPair: KeyPair): String =
|
||||
sign(data, userED25519KeyPair).let(Base64::encodeBytes)
|
||||
private fun sign(data: ByteArray, userED25519KeyPair: KeyPair): ByteArray = ByteArray(Sign.BYTES).also {
|
||||
sodium.cryptoSignDetached(
|
||||
it,
|
||||
data,
|
||||
data.size.toLong(),
|
||||
userED25519KeyPair.secretKey.asBytes
|
||||
)
|
||||
sodium.cryptoSignDetached(it, data, data.size.toLong(), userED25519KeyPair.secretKey.asBytes)
|
||||
}
|
||||
|
||||
fun getRawMessages(snode: Snode, publicKey: String, requiresAuth: Boolean = true, namespace: Int = 0): RawResponsePromise {
|
||||
// Get last message hash
|
||||
val lastHashValue = database.getLastMessageHashValue(snode, publicKey, namespace) ?: ""
|
||||
val parameters = mutableMapOf<String, Any>(
|
||||
"pubKey" to publicKey,
|
||||
"last_hash" to lastHashValue,
|
||||
)
|
||||
val parameters = buildMutableMap<String, Any> {
|
||||
this["pubKey"] = publicKey
|
||||
this["last_hash"] = lastHashValue
|
||||
// If the namespace is default (0) here it will be implicitly read as 0 on the storage server
|
||||
// we only need to specify it explicitly if we want to (in future) or if it is non-zero
|
||||
namespace.takeIf { it != 0 }?.let { this["namespace"] = it }
|
||||
}
|
||||
// Construct signature
|
||||
if (requiresAuth) {
|
||||
val userED25519KeyPair = try {
|
||||
@ -311,23 +310,13 @@ object SnodeAPI {
|
||||
val ed25519PublicKey = userED25519KeyPair.publicKey.asHexString
|
||||
val verificationData = buildString {
|
||||
append("retrieve")
|
||||
if (namespace != 0) append(namespace)
|
||||
namespace.takeIf { it != 0 }?.let(::append)
|
||||
append(timestamp)
|
||||
}.toByteArray()
|
||||
val signature = try {
|
||||
signAndEncode(verificationData, userED25519KeyPair)
|
||||
} catch (exception: Exception) {
|
||||
return Promise.ofFail(Error.SigningFailed)
|
||||
}
|
||||
parameters["signature"] = signAndEncodeCatching(verificationData, userED25519KeyPair).getOrNull()
|
||||
?: return Promise.ofFail(Error.SigningFailed)
|
||||
parameters["timestamp"] = timestamp
|
||||
parameters["pubkey_ed25519"] = ed25519PublicKey
|
||||
parameters["signature"] = signature
|
||||
}
|
||||
|
||||
// If the namespace is default (0) here it will be implicitly read as 0 on the storage server
|
||||
// we only need to specify it explicitly if we want to (in future) or if it is non-zero
|
||||
if (namespace != 0) {
|
||||
parameters["namespace"] = namespace
|
||||
}
|
||||
|
||||
// Make the request
|
||||
@ -341,11 +330,8 @@ object SnodeAPI {
|
||||
val userED25519KeyPair = runCatching { MessagingModuleConfiguration.shared.getUserED25519KeyPair() }.getOrNull() ?: return null
|
||||
|
||||
val verificationData = "store$namespace$messageTimestamp".toByteArray()
|
||||
val signature = try {
|
||||
signAndEncode(verificationData, userED25519KeyPair)
|
||||
} catch (e: Exception) {
|
||||
Log.e("Loki", "Signing data failed with user secret key", e)
|
||||
return null
|
||||
val signature = signAndEncodeCatching(verificationData, userED25519KeyPair).run {
|
||||
getOrNull() ?: return null.also { Log.e("Loki", "Signing data failed with user secret key", exceptionOrNull()) }
|
||||
}
|
||||
|
||||
val params = buildMap {
|
||||
@ -478,13 +464,13 @@ object SnodeAPI {
|
||||
Log.e("Loki", "Signing data failed with user secret key", e)
|
||||
return@retryIfNeeded Promise.ofFail(e)
|
||||
}
|
||||
val params = mapOf(
|
||||
"pubkey" to publicKey,
|
||||
"messages" to hashes,
|
||||
"timestamp" to timestamp,
|
||||
"pubkey_ed25519" to ed25519PublicKey,
|
||||
"signature" to signature
|
||||
)
|
||||
val params = buildMap {
|
||||
this["pubkey"] = publicKey
|
||||
this["messages"] = hashes
|
||||
this["timestamp"] = timestamp
|
||||
this["pubkey_ed25519"] = ed25519PublicKey
|
||||
this["signature"] = signature
|
||||
}
|
||||
getSingleTargetSnode(publicKey) bind { snode ->
|
||||
invoke(Snode.Method.GetExpiries, snode, params, publicKey)
|
||||
}
|
||||
@ -578,7 +564,7 @@ object SnodeAPI {
|
||||
}
|
||||
}
|
||||
|
||||
fun deleteMessage(publicKey: String, serverHashes: List<String>): Promise<Map<String,Boolean>, Exception> =
|
||||
fun deleteMessage(publicKey: String, serverHashes: List<String>): Promise<Map<String, Boolean>, Exception> =
|
||||
retryIfNeeded(maxRetryCount) {
|
||||
val module = MessagingModuleConfiguration.shared
|
||||
val userED25519KeyPair = module.getUserED25519KeyPair() ?: return@retryIfNeeded Promise.ofFail(Error.NoKeyPair)
|
||||
@ -586,35 +572,40 @@ object SnodeAPI {
|
||||
getSingleTargetSnode(publicKey).bind { snode ->
|
||||
retryIfNeeded(maxRetryCount) {
|
||||
val verificationData = sequenceOf(Snode.Method.DeleteMessage.rawValue).plus(serverHashes).toByteArray()
|
||||
val deleteMessageParams = mapOf(
|
||||
"pubkey" to userPublicKey,
|
||||
"pubkey_ed25519" to userED25519KeyPair.publicKey.asHexString,
|
||||
"messages" to serverHashes,
|
||||
"signature" to signAndEncode(verificationData, userED25519KeyPair)
|
||||
)
|
||||
val deleteMessageParams = buildMap {
|
||||
this["pubkey"] = userPublicKey
|
||||
this["pubkey_ed25519"] = userED25519KeyPair.publicKey.asHexString
|
||||
this["messages"] = serverHashes
|
||||
this["signature"] = signAndEncode(verificationData, userED25519KeyPair)
|
||||
}
|
||||
invoke(Snode.Method.DeleteMessage, snode, deleteMessageParams, publicKey).map { rawResponse ->
|
||||
val swarms = rawResponse["swarm"] as? Map<String, Any> ?: return@map mapOf()
|
||||
val result = swarms.mapNotNull { (hexSnodePublicKey, rawJSON) ->
|
||||
val json = rawJSON as? Map<String, Any> ?: return@mapNotNull null
|
||||
val isFailed = json["failed"] as? Boolean ?: false
|
||||
val statusCode = json["code"] as? String
|
||||
val reason = json["reason"] as? String
|
||||
hexSnodePublicKey to if (isFailed) {
|
||||
Log.e("Loki", "Failed to delete messages from: $hexSnodePublicKey due to error: $reason ($statusCode).")
|
||||
false
|
||||
} else {
|
||||
val hashes = json["deleted"] as List<String> // Hashes of deleted messages
|
||||
val signature = json["signature"] as String
|
||||
val snodePublicKey = Key.fromHexString(hexSnodePublicKey)
|
||||
// The signature looks like ( PUBKEY_HEX || RMSG[0] || ... || RMSG[N] || DMSG[0] || ... || DMSG[M] )
|
||||
val message = sequenceOf(userPublicKey).plus(serverHashes).plus(hashes).toByteArray()
|
||||
sodium.cryptoSignVerifyDetached(Base64.decode(signature), message, message.size, snodePublicKey.asBytes)
|
||||
swarms.mapValuesNotNull { (hexSnodePublicKey, rawJSON) ->
|
||||
(rawJSON as? Map<String, Any>)?.let { json ->
|
||||
val isFailed = json["failed"] as? Boolean ?: false
|
||||
val statusCode = json["code"] as? String
|
||||
val reason = json["reason"] as? String
|
||||
|
||||
if (isFailed) {
|
||||
Log.e("Loki", "Failed to delete messages from: $hexSnodePublicKey due to error: $reason ($statusCode).")
|
||||
false
|
||||
} else {
|
||||
// Hashes of deleted messages
|
||||
val hashes = json["deleted"] as List<String>
|
||||
val signature = json["signature"] as String
|
||||
val snodePublicKey = Key.fromHexString(hexSnodePublicKey)
|
||||
// The signature looks like ( PUBKEY_HEX || RMSG[0] || ... || RMSG[N] || DMSG[0] || ... || DMSG[M] )
|
||||
val message = sequenceOf(userPublicKey).plus(serverHashes).plus(hashes).toByteArray()
|
||||
sodium.cryptoSignVerifyDetached(
|
||||
Base64.decode(signature),
|
||||
message,
|
||||
message.size,
|
||||
snodePublicKey.asBytes
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
return@map result.toMap()
|
||||
}.fail { e ->
|
||||
Log.e("Loki", "Failed to delete messages", e)
|
||||
}
|
||||
}.fail { e -> Log.e("Loki", "Failed to delete messages", e) }
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -643,18 +634,16 @@ object SnodeAPI {
|
||||
retryIfNeeded(maxRetryCount) {
|
||||
getNetworkTime(snode).bind { (_, timestamp) ->
|
||||
val verificationData = (Snode.Method.DeleteAll.rawValue + Namespace.ALL + timestamp.toString()).toByteArray()
|
||||
val deleteMessageParams = mapOf(
|
||||
"pubkey" to userPublicKey,
|
||||
"pubkey_ed25519" to userED25519KeyPair.publicKey.asHexString,
|
||||
"timestamp" to timestamp,
|
||||
"signature" to signAndEncode(verificationData, userED25519KeyPair),
|
||||
"namespace" to Namespace.ALL,
|
||||
)
|
||||
invoke(Snode.Method.DeleteAll, snode, deleteMessageParams, userPublicKey).map {
|
||||
rawResponse -> parseDeletions(userPublicKey, timestamp, rawResponse)
|
||||
}.fail { e ->
|
||||
Log.e("Loki", "Failed to clear data", e)
|
||||
val deleteMessageParams = buildMap {
|
||||
this["pubkey"] = userPublicKey
|
||||
this["pubkey_ed25519"] = userED25519KeyPair.publicKey.asHexString
|
||||
this["timestamp"] = timestamp
|
||||
this["signature"] = signAndEncode(verificationData, userED25519KeyPair)
|
||||
this["namespace"] = Namespace.ALL
|
||||
}
|
||||
invoke(Snode.Method.DeleteAll, snode, deleteMessageParams, userPublicKey)
|
||||
.map { rawResponse -> parseDeletions(userPublicKey, timestamp, rawResponse) }
|
||||
.fail { e -> Log.e("Loki", "Failed to clear data", e) }
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -662,69 +651,53 @@ object SnodeAPI {
|
||||
|
||||
fun parseRawMessagesResponse(rawResponse: RawResponse, snode: Snode, publicKey: String, namespace: Int = 0, updateLatestHash: Boolean = true, updateStoredHashes: Boolean = true): List<Pair<SignalServiceProtos.Envelope, String?>> =
|
||||
(rawResponse["messages"] as? List<*>)?.let { messages ->
|
||||
if (updateLatestHash) {
|
||||
updateLastMessageHashValueIfPossible(snode, publicKey, messages, namespace)
|
||||
}
|
||||
if (updateLatestHash) updateLastMessageHashValueIfPossible(snode, publicKey, messages, namespace)
|
||||
removeDuplicates(publicKey, messages, namespace, updateStoredHashes).let(::parseEnvelopes)
|
||||
} ?: listOf()
|
||||
|
||||
fun updateLastMessageHashValueIfPossible(snode: Snode, publicKey: String, rawMessages: List<*>, namespace: Int) {
|
||||
val lastMessageAsJSON = rawMessages.lastOrNull() as? Map<*, *>
|
||||
val hashValue = lastMessageAsJSON?.get("hash") as? String
|
||||
if (hashValue != null) {
|
||||
database.setLastMessageHashValue(snode, publicKey, hashValue, namespace)
|
||||
} else if (rawMessages.isNotEmpty()) {
|
||||
Log.d("Loki", "Failed to update last message hash value from: ${rawMessages.prettifiedDescription()}.")
|
||||
when {
|
||||
hashValue != null -> database.setLastMessageHashValue(snode, publicKey, hashValue, namespace)
|
||||
rawMessages.isNotEmpty() -> Log.d("Loki", "Failed to update last message hash value from: ${rawMessages.prettifiedDescription()}.")
|
||||
}
|
||||
}
|
||||
|
||||
fun removeDuplicates(publicKey: String, rawMessages: List<*>, namespace: Int, updateStoredHashes: Boolean): List<*> {
|
||||
val originalMessageHashValues = database.getReceivedMessageHashValues(publicKey, namespace) ?: emptySet()
|
||||
val receivedMessageHashValues = originalMessageHashValues.toMutableSet()
|
||||
val result = rawMessages.filter { rawMessage ->
|
||||
return rawMessages.filter { rawMessage ->
|
||||
(rawMessage as? Map<*, *>)
|
||||
?.let { it["hash"] as? String }
|
||||
?.let { receivedMessageHashValues.add(it) }
|
||||
?: false.also { Log.d("Loki", "Missing hash value for message: ${rawMessage?.prettifiedDescription()}.") }
|
||||
}
|
||||
if (updateStoredHashes && originalMessageHashValues.containsAll(receivedMessageHashValues)) {
|
||||
database.setReceivedMessageHashValues(publicKey, receivedMessageHashValues, namespace)
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
private fun parseEnvelopes(rawMessages: List<*>): List<Pair<SignalServiceProtos.Envelope, String?>> =
|
||||
rawMessages.mapNotNull { rawMessage ->
|
||||
val rawMessageAsJSON = rawMessage as? Map<*, *>
|
||||
val base64EncodedData = rawMessageAsJSON?.get("data") as? String
|
||||
val data = base64EncodedData?.let { Base64.decode(it) }
|
||||
|
||||
data?.runCatching(MessageWrapper::unwrap)
|
||||
?.map { it to rawMessageAsJSON["hash"] as? String }
|
||||
?.onFailure { Log.d("Loki", "Failed to unwrap data for message: ${rawMessage.prettifiedDescription()}.") }
|
||||
|
||||
if (data != null) {
|
||||
try {
|
||||
MessageWrapper.unwrap(data) to rawMessageAsJSON["hash"] as? String
|
||||
} catch (e: Exception) {
|
||||
Log.d("Loki", "Failed to unwrap data for message: ${rawMessage.prettifiedDescription()}.")
|
||||
null
|
||||
}
|
||||
} else {
|
||||
Log.d("Loki", "Failed to decode data for message: ${rawMessage?.prettifiedDescription()}.")
|
||||
null
|
||||
}.also {
|
||||
if (updateStoredHashes && originalMessageHashValues.containsAll(receivedMessageHashValues)) {
|
||||
database.setReceivedMessageHashValues(publicKey, receivedMessageHashValues, namespace)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun parseEnvelopes(rawMessages: List<*>): List<Pair<SignalServiceProtos.Envelope, String?>> = rawMessages.mapNotNull { rawMessage ->
|
||||
val rawMessageAsJSON = rawMessage as? Map<*, *>
|
||||
val base64EncodedData = rawMessageAsJSON?.get("data") as? String
|
||||
val data = base64EncodedData?.let { Base64.decode(it) }
|
||||
|
||||
data ?: Log.d("Loki", "Failed to decode data for message: ${rawMessage?.prettifiedDescription()}.")
|
||||
|
||||
data?.runCatching { MessageWrapper.unwrap(this) to rawMessageAsJSON["hash"] as? String }
|
||||
?.onFailure { Log.d("Loki", "Failed to unwrap data for message: ${rawMessage.prettifiedDescription()}.") }
|
||||
?.getOrNull()
|
||||
}
|
||||
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
private fun parseDeletions(userPublicKey: String, timestamp: Long, rawResponse: RawResponse): Map<String, Boolean> {
|
||||
val swarms = rawResponse["swarm"] as? Map<String, Any> ?: return mapOf()
|
||||
return swarms.mapNotNull { (hexSnodePublicKey, rawJSON) ->
|
||||
val json = rawJSON as? Map<String, Any> ?: return@mapNotNull null
|
||||
val isFailed = json["failed"] as? Boolean ?: false
|
||||
val statusCode = json["code"] as? String
|
||||
val reason = json["reason"] as? String
|
||||
hexSnodePublicKey to if (isFailed) {
|
||||
private fun parseDeletions(userPublicKey: String, timestamp: Long, rawResponse: RawResponse): Map<String, Boolean> =
|
||||
(rawResponse["swarm"] as? Map<String, Any>)?.mapValuesNotNull { (hexSnodePublicKey, rawJSON) ->
|
||||
val json = rawJSON as? Map<String, Any> ?: return@mapValuesNotNull null
|
||||
if (json["failed"] as? Boolean == true) {
|
||||
val reason = json["reason"] as? String
|
||||
val statusCode = json["code"] as? String
|
||||
Log.e("Loki", "Failed to delete all messages from: $hexSnodePublicKey due to error: $reason ($statusCode).")
|
||||
false
|
||||
} else {
|
||||
@ -735,8 +708,7 @@ object SnodeAPI {
|
||||
val message = sequenceOf(userPublicKey, "$timestamp").plus(hashes).toByteArray()
|
||||
sodium.cryptoSignVerifyDetached(Base64.decode(signature), message, message.size, snodePublicKey.asBytes)
|
||||
}
|
||||
}.toMap()
|
||||
}
|
||||
} ?: mapOf()
|
||||
|
||||
// endregion
|
||||
|
||||
@ -752,7 +724,7 @@ object SnodeAPI {
|
||||
publicKey?.let { dropSnodeFromSwarmIfNeeded(snode, it) }
|
||||
snodePool -= snode
|
||||
Log.d("Loki", "Snode pool count: ${snodePool.count()}.")
|
||||
snodeFailureCount[snode] = 0
|
||||
snodeFailureCount.remove(snode)
|
||||
}
|
||||
}
|
||||
when (statusCode) {
|
||||
@ -765,15 +737,14 @@ object SnodeAPI {
|
||||
}
|
||||
421 -> {
|
||||
// The snode isn't associated with the given public key anymore
|
||||
if (publicKey != null) {
|
||||
json?.let(::parseSnodes)
|
||||
?.takeIf { it.isNotEmpty() }
|
||||
?.let { database.setSwarm(publicKey, it.toSet()) }
|
||||
?: run {
|
||||
Log.d("Loki", "Invalidating swarm for: $publicKey.")
|
||||
dropSnodeFromSwarmIfNeeded(snode, publicKey)
|
||||
}
|
||||
} else Log.d("Loki", "Got a 421 without an associated public key.")
|
||||
if (publicKey == null) Log.d("Loki", "Got a 421 without an associated public key.")
|
||||
else json?.let(::parseSnodes)
|
||||
?.takeIf { it.isNotEmpty() }
|
||||
?.let { database.setSwarm(publicKey, it.toSet()) }
|
||||
?: run {
|
||||
Log.d("Loki", "Invalidating swarm for: $publicKey.")
|
||||
dropSnodeFromSwarmIfNeeded(snode, publicKey)
|
||||
}
|
||||
}
|
||||
404 -> {
|
||||
Log.d("Loki", "404, probably no file found")
|
||||
|
@ -402,6 +402,12 @@ fun <E, K: Any, V: Any> Iterable<E>.associateByNotNull(
|
||||
for(e in this) { it[keySelector(e) ?: continue] = valueTransform(e) ?: continue }
|
||||
}
|
||||
|
||||
fun <K: Any, V: Any, W : Any> Map<K, V>.mapValuesNotNull(
|
||||
valueTransform: (Map.Entry<K, V>) -> W?
|
||||
): Map<K, W> = mutableMapOf<K, W>().also {
|
||||
for(e in this) { it[e.key] = valueTransform(e) ?: continue }
|
||||
}
|
||||
|
||||
/**
|
||||
* Groups elements of the original collection by the key returned by the given [keySelector] function
|
||||
* applied to each element and returns a map where each group key is associated with a list of
|
||||
@ -413,6 +419,21 @@ inline fun <E, K> Iterable<E>.groupByNotNull(keySelector: (E) -> K?): Map<K, Lis
|
||||
forEach { e -> keySelector(e)?.let { k -> it.getOrPut(k) { mutableListOf() } += e } }
|
||||
}
|
||||
|
||||
/**
|
||||
* Analogous to [buildMap], this function creates a [MutableMap] and populates it using the given [action].
|
||||
*/
|
||||
inline fun <K, V> buildMutableMap(action: MutableMap<K, V>.() -> Unit): MutableMap<K, V> =
|
||||
mutableMapOf<K, V>().apply(action)
|
||||
|
||||
/**
|
||||
* Converts a list of Pairs into a Map, filtering out any Pairs where the value is null.
|
||||
*
|
||||
* @param pairs The list of Pairs to convert.
|
||||
* @return A Map with non-null values.
|
||||
*/
|
||||
fun <K : Any, V : Any> Iterable<Pair<K, V?>>.toMapNotNull(): Map<K, V> =
|
||||
associateByNotNull(Pair<K, V?>::first, Pair<K, V?>::second)
|
||||
|
||||
fun Sequence<String>.toByteArray(): ByteArray = ByteArrayOutputStream().use { output ->
|
||||
forEach { it.byteInputStream().use { input -> input.copyTo(output) } }
|
||||
output.toByteArray()
|
||||
|
Loading…
Reference in New Issue
Block a user