refactor: config database changes, new protos, adding in support for config base namespace queries

This commit is contained in:
0x330a
2023-01-18 15:51:29 +11:00
parent 00fe77af8a
commit 8bea5e73e6
8 changed files with 1261 additions and 163 deletions

View File

@@ -1,5 +1,9 @@
package org.session.libsession.messaging.sending_receiving.pollers
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.async
import kotlinx.coroutines.runBlocking
import nl.komponents.kovenant.Deferred
import nl.komponents.kovenant.Promise
import nl.komponents.kovenant.deferred
@@ -100,21 +104,38 @@ class Poller(private val configFactory: ConfigFactoryProtocol) {
private fun poll(snode: Snode, deferred: Deferred<Unit, Exception>): Promise<Unit, Exception> {
if (!hasStarted) { return Promise.ofFail(PromiseCanceledException()) }
return SnodeAPI.getRawMessages(snode, userPublicKey).bind { rawResponse ->
isCaughtUp = true
if (deferred.promise.isDone()) {
task { Unit } // The long polling connection has been canceled; don't recurse
} else {
val messages = SnodeAPI.parseRawMessagesResponse(rawResponse, snode, userPublicKey)
val parameters = messages.map { (envelope, serverHash) ->
MessageReceiveParameters(envelope.toByteArray(), serverHash = serverHash)
}
parameters.chunked(BatchMessageReceiveJob.BATCH_DEFAULT_NUMBER).forEach { chunk ->
val job = BatchMessageReceiveJob(chunk)
JobQueue.shared.add(job)
return task {
runBlocking(Dispatchers.IO) {
// get user profile namespace
val deferredUserConfig = configFactory.userConfig?.let { currentUserConfig ->
async {
try {
val currentNetworkConfig = SnodeAPI.getRawMessages(snode, userPublicKey, namespace = currentUserConfig.configNamespace()).get()
} catch (e: Exception) {
Log.e("Poller", "Error getting current user config from network", e)
}
}
} ?: CompletableDeferred(null)
SnodeAPI.getRawMessages(snode, userPublicKey).bind { rawResponse ->
isCaughtUp = true
if (deferred.promise.isDone()) {
return@bind Promise.ofSuccess(Unit)
} else {
val messages = SnodeAPI.parseRawMessagesResponse(rawResponse, snode, userPublicKey)
val parameters = messages.map { (envelope, serverHash) ->
MessageReceiveParameters(envelope.toByteArray(), serverHash = serverHash)
}
parameters.chunked(BatchMessageReceiveJob.BATCH_DEFAULT_NUMBER).forEach { chunk ->
val job = BatchMessageReceiveJob(chunk)
JobQueue.shared.add(job)
}
poll(snode, deferred)
}
}
poll(snode, deferred)
}
}
}

View File

@@ -310,14 +310,15 @@ object SnodeAPI {
fun getRawMessages(snode: Snode, publicKey: String, requiresAuth: Boolean = true, namespace: Int = 0): RawResponsePromise {
// Get last message hash
val lastHashValue = database.getLastMessageHashValue(snode, publicKey, namespace) ?: ""
val parameters = mutableMapOf<String,Any>(
val parameters = mutableMapOf<String, Any>(
"pubKey" to publicKey,
"last_hash" to lastHashValue,
)
// Construct signature
if (requiresAuth) {
val userED25519KeyPair = try {
MessagingModuleConfiguration.shared.getUserED25519KeyPair() ?: return Promise.ofFail(Error.NoKeyPair)
MessagingModuleConfiguration.shared.getUserED25519KeyPair()
?: return Promise.ofFail(Error.NoKeyPair)
} catch (e: Exception) {
Log.e("Loki", "Error getting KeyPair", e)
return Promise.ofFail(Error.NoKeyPair)
@@ -329,7 +330,12 @@ object SnodeAPI {
if (namespace != 0) "retrieve$namespace$timestamp".toByteArray()
else "retrieve$timestamp".toByteArray()
try {
sodium.cryptoSignDetached(signature, verificationData, verificationData.size.toLong(), userED25519KeyPair.secretKey.asBytes)
sodium.cryptoSignDetached(
signature,
verificationData,
verificationData.size.toLong(),
userED25519KeyPair.secretKey.asBytes
)
} catch (exception: Exception) {
return Promise.ofFail(Error.SigningFailed)
}