feat: add batch snode calls and try to poll from all the config namespaces

This commit is contained in:
0x330a 2023-01-19 17:17:32 +11:00
parent f573c7deaf
commit 39bd0bba22
No known key found for this signature in database
GPG Key ID: 267811D6E6A2698C
4 changed files with 85 additions and 21 deletions

View File

@ -6,6 +6,7 @@ import dagger.Provides
import dagger.hilt.InstallIn
import dagger.hilt.android.qualifiers.ApplicationContext
import dagger.hilt.components.SingletonComponent
import org.session.libsession.utilities.TextSecurePreferences
import org.thoughtcrime.securesms.crypto.KeyPairUtilities
import org.thoughtcrime.securesms.database.ConfigDatabase
import javax.inject.Singleton
@ -23,7 +24,10 @@ object SessionUtilModule {
@Singleton
fun provideConfigFactory(@ApplicationContext context: Context, configDatabase: ConfigDatabase): ConfigFactory =
ConfigFactory(configDatabase) {
maybeUserEdSecretKey(context)
val localUserPublicKey = TextSecurePreferences.getLocalNumber(context)
val secretKey = maybeUserEdSecretKey(context)
if (localUserPublicKey == null || secretKey == null) null
else secretKey to localUserPublicKey
}
}

View File

@ -1,8 +1,6 @@
package org.session.libsession.messaging.sending_receiving.pollers
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.async
import kotlinx.coroutines.runBlocking
import nl.komponents.kovenant.Deferred
import nl.komponents.kovenant.Promise
@ -14,9 +12,11 @@ import org.session.libsession.messaging.MessagingModuleConfiguration
import org.session.libsession.messaging.jobs.BatchMessageReceiveJob
import org.session.libsession.messaging.jobs.JobQueue
import org.session.libsession.messaging.jobs.MessageReceiveParameters
import org.session.libsession.snode.RawResponse
import org.session.libsession.snode.SnodeAPI
import org.session.libsession.snode.SnodeModule
import org.session.libsession.utilities.ConfigFactoryProtocol
import org.session.libsignal.protos.SignalServiceProtos
import org.session.libsignal.utilities.Log
import org.session.libsignal.utilities.Snode
import java.security.SecureRandom
@ -102,28 +102,42 @@ class Poller(private val configFactory: ConfigFactoryProtocol) {
}
}
private fun processUserConfig(rawMessages: List<Pair<SignalServiceProtos.Envelope, String?>>) {
}
private fun poll(snode: Snode, deferred: Deferred<Unit, Exception>): Promise<Unit, Exception> {
if (!hasStarted) { return Promise.ofFail(PromiseCanceledException()) }
return task {
runBlocking(Dispatchers.IO) {
// get user profile namespace
val deferredUserConfig = configFactory.userConfig?.let { currentUserConfig ->
async {
try {
val currentNetworkConfig = SnodeAPI.getRawMessages(snode, userPublicKey, namespace = currentUserConfig.configNamespace()).get()
} catch (e: Exception) {
Log.e("Poller", "Error getting current user config from network", e)
}
val requests = listOfNotNull(
// get messages
SnodeAPI.buildAuthenticatedRetrieveBatchRequest(snode, userPublicKey),
// get user config namespace
configFactory.userConfig?.let { currentUserConfig ->
SnodeAPI.buildAuthenticatedRetrieveBatchRequest(
snode,
userPublicKey,
currentUserConfig.configNamespace()
)
},
// get contact config namespace
configFactory.contacts?.let { currentContacts ->
SnodeAPI.buildAuthenticatedRetrieveBatchRequest(
snode,
userPublicKey,
currentContacts.configNamespace()
)
}
} ?: CompletableDeferred(null)
)
SnodeAPI.getRawMessages(snode, userPublicKey).bind { rawResponse ->
SnodeAPI.getRawBatchResponse(snode, userPublicKey, requests).bind { rawResponses ->
isCaughtUp = true
if (deferred.promise.isDone()) {
return@bind Promise.ofSuccess(Unit)
} else {
val messages = SnodeAPI.parseRawMessagesResponse(rawResponse, snode, userPublicKey)
val messageResponse = (rawResponses["results"] as List<*>).first() as RawResponse
val messages = SnodeAPI.parseRawMessagesResponse(messageResponse, snode, userPublicKey)
val parameters = messages.map { (envelope, serverHash) ->
MessageReceiveParameters(envelope.toByteArray(), serverHash = serverHash)
}
@ -135,7 +149,6 @@ class Poller(private val configFactory: ConfigFactoryProtocol) {
poll(snode, deferred)
}
}
}
}
}

View File

@ -33,7 +33,6 @@ import org.session.libsignal.utilities.ThreadUtils
import org.session.libsignal.utilities.prettifiedDescription
import org.session.libsignal.utilities.retryIfNeeded
import java.security.SecureRandom
import java.util.Date
import java.util.Locale
import kotlin.collections.component1
import kotlin.collections.component2
@ -93,6 +92,12 @@ object SnodeAPI {
object ValidationFailed : Error("ONS name validation failed.")
}
// Batch
data class SnodeBatchRequestInfo(
val method: String,
val params: Map<String, Any>,
) // assume signatures, pubkey and namespaces are attached in parameters if required
// Internal API
internal fun invoke(
method: Snode.Method,
@ -323,7 +328,7 @@ object SnodeAPI {
Log.e("Loki", "Error getting KeyPair", e)
return Promise.ofFail(Error.NoKeyPair)
}
val timestamp = Date().time + SnodeAPI.clockOffset
val timestamp = System.currentTimeMillis() + clockOffset
val ed25519PublicKey = userED25519KeyPair.publicKey.asHexString
val signature = ByteArray(Sign.BYTES)
val verificationData =
@ -351,7 +356,48 @@ object SnodeAPI {
}
// Make the request
return invoke(Snode.Method.GetMessages, snode, parameters, publicKey)
return invoke(Snode.Method.Retrieve, snode, parameters, publicKey)
}
fun buildAuthenticatedRetrieveBatchRequest(snode: Snode, publicKey: String, namespace: Int = 0): SnodeBatchRequestInfo? {
val lastHashValue = database.getLastMessageHashValue(snode, publicKey, namespace) ?: ""
val params = mutableMapOf<String, Any>(
"pubKey" to publicKey,
"last_hash" to lastHashValue,
)
val userEd25519KeyPair = try {
MessagingModuleConfiguration.shared.getUserED25519KeyPair() ?: return null
} catch (e: Exception) {
return null
}
val ed25519PublicKey = userEd25519KeyPair.publicKey.asHexString
val timestamp = System.currentTimeMillis() + clockOffset
val signature = ByteArray(Sign.BYTES)
val verificationData = "retrieve$namespace$timestamp".toByteArray()
try {
sodium.cryptoSignDetached(
signature,
verificationData,
verificationData.size.toLong(),
userEd25519KeyPair.secretKey.asBytes
)
} catch (e: Exception) {
return null
}
params["timestamp"] = timestamp
// params["pubkey_ed25519"] = ed25519PublicKey
params["signature"] = Base64.encodeBytes(signature)
return SnodeBatchRequestInfo(
Snode.Method.Retrieve.rawValue,
params
)
}
fun getRawBatchResponse(snode: Snode, publicKey: String, requests: List<SnodeBatchRequestInfo>): RawResponsePromise {
val parameters = mutableMapOf<String, Any>(
"requests" to requests
)
return invoke(Snode.Method.Batch, snode, parameters, publicKey)
}
fun getMessages(publicKey: String): MessageListPromise {

View File

@ -5,12 +5,13 @@ class Snode(val address: String, val port: Int, val publicKeySet: KeySet?) {
public enum class Method(val rawValue: String) {
GetSwarm("get_snodes_for_pubkey"),
GetMessages("retrieve"),
Retrieve("retrieve"),
SendMessage("store"),
DeleteMessage("delete"),
OxenDaemonRPCCall("oxend_request"),
Info("info"),
DeleteAll("delete_all")
DeleteAll("delete_all"),
Batch("batch")
}
data class KeySet(val ed25519Key: String, val x25519Key: String)