Namespace retrieval and storage with auth (#880)

* feat: add migration and fork info for upcoming auth and closed group retrieval updates

* feat: add closed group poller calls and include namespace to parse raw messages function

* feat: add DB upgrades and queries for namespaces

* fix: fix the polling for post-HF signatures and group messages

* fix: realise we need a compound key for namespaces in received hashes, test explicitly setting namespace

* feat: add setForkInfo implementation

* refactor: include default fork info command on create, refactor migration to use new table since we can't add constraints in alter for PK, replace `lastHash` with `last_hash` in case that fixes paging

* refactor: include namespace and use when statement for closed group polling

* refactor: revert to main net

* refactor: use namespace constants

* refactor: revert to testnet and log the poll result

* fix: use or to log either poller

* fix: revert to default network and add more logging, only set the latest fork info if it is an increment

* build: update minor version

* refactor: use single target snode and namespace list for message sending

* fix: link previews and expiring messages in closed groups
This commit is contained in:
Harris
2022-05-18 10:20:57 +10:00
committed by GitHub
parent 7fc3599c25
commit 00f06ab034
14 changed files with 325 additions and 125 deletions

View File

@@ -8,9 +8,17 @@ import org.session.libsession.messaging.jobs.MessageSendJob
import org.session.libsession.messaging.jobs.NotifyPNServerJob
import org.session.libsession.messaging.messages.Destination
import org.session.libsession.messaging.messages.Message
import org.session.libsession.messaging.messages.control.*
import org.session.libsession.messaging.messages.visible.*
import org.session.libsession.messaging.open_groups.*
import org.session.libsession.messaging.messages.control.CallMessage
import org.session.libsession.messaging.messages.control.ClosedGroupControlMessage
import org.session.libsession.messaging.messages.control.ConfigurationMessage
import org.session.libsession.messaging.messages.control.ExpirationTimerUpdate
import org.session.libsession.messaging.messages.control.UnsendRequest
import org.session.libsession.messaging.messages.visible.LinkPreview
import org.session.libsession.messaging.messages.visible.Profile
import org.session.libsession.messaging.messages.visible.Quote
import org.session.libsession.messaging.messages.visible.VisibleMessage
import org.session.libsession.messaging.open_groups.OpenGroupAPIV2
import org.session.libsession.messaging.open_groups.OpenGroupMessageV2
import org.session.libsession.messaging.utilities.MessageWrapper
import org.session.libsession.snode.RawResponsePromise
import org.session.libsession.snode.SnodeAPI
@@ -22,8 +30,11 @@ import org.session.libsession.utilities.SSKEnvironment
import org.session.libsignal.crypto.PushTransportDetails
import org.session.libsignal.protos.SignalServiceProtos
import org.session.libsignal.utilities.Base64
import org.session.libsignal.utilities.Log
import org.session.libsignal.utilities.Namespace
import org.session.libsignal.utilities.defaultRequiresAuth
import org.session.libsignal.utilities.hasNamespaces
import org.session.libsignal.utilities.hexEncodedPublicKey
import java.util.concurrent.atomic.AtomicInteger
import org.session.libsession.messaging.sending_receiving.attachments.Attachment as SignalAttachment
import org.session.libsession.messaging.sending_receiving.link_preview.LinkPreview as SignalLinkPreview
import org.session.libsession.messaging.sending_receiving.quotes.QuoteModel as SignalQuote
@@ -125,6 +136,15 @@ object MessageSender {
// Wrap the result
val kind: SignalServiceProtos.Envelope.Type
val senderPublicKey: String
// TODO: this might change in future for config messages
val forkInfo = SnodeAPI.forkInfo
val namespaces: List<Int> = when {
destination is Destination.ClosedGroup
&& forkInfo.defaultRequiresAuth() -> listOf(Namespace.UNAUTHENTICATED_CLOSED_GROUP)
destination is Destination.ClosedGroup
&& forkInfo.hasNamespaces() -> listOf(Namespace.UNAUTHENTICATED_CLOSED_GROUP, Namespace.DEFAULT)
else -> listOf(Namespace.DEFAULT)
}
when (destination) {
is Destination.Contact -> {
kind = SignalServiceProtos.Envelope.Type.SESSION_MESSAGE
@@ -148,11 +168,11 @@ object MessageSender {
if (destination is Destination.Contact && message is VisibleMessage && !isSelfSend) {
SnodeModule.shared.broadcaster.broadcast("sendingMessage", message.sentTimestamp!!)
}
SnodeAPI.sendMessage(snodeMessage).success { promises: Set<RawResponsePromise> ->
namespaces.map { namespace -> SnodeAPI.sendMessage(snodeMessage, requiresAuth = false, namespace = namespace) }.let { promises ->
var isSuccess = false
val promiseCount = promises.size
var errorCount = 0
promises.iterator().forEach { promise: RawResponsePromise ->
var errorCount = AtomicInteger(0)
promises.forEach { promise: RawResponsePromise ->
promise.success {
if (isSuccess) { return@success } // Succeed as soon as the first promise succeeds
isSuccess = true
@@ -162,7 +182,7 @@ object MessageSender {
val hash = it["hash"] as? String
message.serverHash = hash
handleSuccessfulMessageSend(message, destination, isSyncMessage)
var shouldNotify = ((message is VisibleMessage || message is UnsendRequest || message is CallMessage) && !isSyncMessage)
val shouldNotify = ((message is VisibleMessage || message is UnsendRequest || message is CallMessage) && !isSyncMessage)
/*
if (message is ClosedGroupControlMessage && message.kind is ClosedGroupControlMessage.Kind.New) {
shouldNotify = true
@@ -175,14 +195,11 @@ object MessageSender {
deferred.resolve(Unit)
}
promise.fail {
errorCount += 1
if (errorCount != promiseCount) { return@fail } // Only error out if all promises failed
errorCount.getAndIncrement()
if (errorCount.get() != promiseCount) { return@fail } // Only error out if all promises failed
handleFailure(it)
}
}
}.fail {
Log.d("Loki", "Couldn't send message due to error: $it.")
handleFailure(it)
}
} catch (exception: Exception) {
handleFailure(exception)

View File

@@ -3,16 +3,20 @@ package org.session.libsession.messaging.sending_receiving.pollers
import nl.komponents.kovenant.Promise
import nl.komponents.kovenant.functional.bind
import nl.komponents.kovenant.functional.map
import nl.komponents.kovenant.task
import org.session.libsession.messaging.MessagingModuleConfiguration
import org.session.libsession.messaging.jobs.BatchMessageReceiveJob
import org.session.libsession.messaging.jobs.JobQueue
import org.session.libsession.messaging.jobs.MessageReceiveJob
import org.session.libsession.messaging.jobs.MessageReceiveParameters
import org.session.libsession.snode.SnodeAPI
import org.session.libsession.utilities.GroupUtil
import org.session.libsignal.crypto.getRandomElementOrNull
import org.session.libsignal.utilities.Log
import java.util.*
import org.session.libsignal.utilities.Namespace
import org.session.libsignal.utilities.defaultRequiresAuth
import org.session.libsignal.utilities.hasNamespaces
import java.text.DateFormat
import java.util.Date
import java.util.concurrent.Executors
import java.util.concurrent.ScheduledFuture
import java.util.concurrent.TimeUnit
@@ -98,7 +102,26 @@ class ClosedGroupPollerV2 {
val promise = SnodeAPI.getSwarm(groupPublicKey).bind { swarm ->
val snode = swarm.getRandomElementOrNull() ?: throw InsufficientSnodesException() // Should be cryptographically secure
if (!isPolling(groupPublicKey)) { throw PollingCanceledException() }
SnodeAPI.getRawMessages(snode, groupPublicKey).map { SnodeAPI.parseRawMessagesResponse(it, snode, groupPublicKey) }
val currentForkInfo = SnodeAPI.forkInfo
when {
currentForkInfo.defaultRequiresAuth() -> SnodeAPI.getRawMessages(snode, groupPublicKey, requiresAuth = false, namespace = Namespace.UNAUTHENTICATED_CLOSED_GROUP)
.map { SnodeAPI.parseRawMessagesResponse(it, snode, groupPublicKey, Namespace.UNAUTHENTICATED_CLOSED_GROUP) }
currentForkInfo.hasNamespaces() -> task {
val unAuthed = SnodeAPI.getRawMessages(snode, groupPublicKey, requiresAuth = false, namespace = Namespace.UNAUTHENTICATED_CLOSED_GROUP)
.map { SnodeAPI.parseRawMessagesResponse(it, snode, groupPublicKey, Namespace.UNAUTHENTICATED_CLOSED_GROUP) }
val default = SnodeAPI.getRawMessages(snode, groupPublicKey, requiresAuth = false, namespace = Namespace.DEFAULT)
.map { SnodeAPI.parseRawMessagesResponse(it, snode, groupPublicKey, Namespace.DEFAULT) }
val unAuthedResult = unAuthed.get()
val defaultResult = default.get()
val format = DateFormat.getTimeInstance()
if (unAuthedResult.isNotEmpty() || defaultResult.isNotEmpty()) {
Log.d("Poller", "@${format.format(Date())}Polled ${unAuthedResult.size} from -10, ${defaultResult.size} from 0")
}
unAuthedResult + defaultResult
}
else -> SnodeAPI.getRawMessages(snode, groupPublicKey, requiresAuth = false, namespace = Namespace.DEFAULT)
.map { SnodeAPI.parseRawMessagesResponse(it, snode, groupPublicKey) }
}
}
promise.success { envelopes ->
if (!isPolling(groupPublicKey)) { return@success }

View File

@@ -8,20 +8,24 @@ import nl.komponents.kovenant.functional.map
import okhttp3.Request
import org.session.libsession.messaging.file_server.FileServerAPIV2
import org.session.libsession.utilities.AESGCM
import org.session.libsignal.utilities.Log
import org.session.libsignal.utilities.Base64
import org.session.libsignal.utilities.*
import org.session.libsignal.utilities.Snode
import org.session.libsession.utilities.AESGCM.EncryptionResult
import org.session.libsession.utilities.getBodyForOnionRequest
import org.session.libsession.utilities.getHeadersForOnionRequest
import org.session.libsignal.crypto.getRandomElement
import org.session.libsignal.crypto.getRandomElementOrNull
import org.session.libsignal.utilities.Broadcaster
import org.session.libsignal.utilities.HTTP
import org.session.libsignal.database.LokiAPIDatabaseProtocol
import java.util.*
import kotlin.math.abs
import org.session.libsignal.utilities.Base64
import org.session.libsignal.utilities.Broadcaster
import org.session.libsignal.utilities.ForkInfo
import org.session.libsignal.utilities.HTTP
import org.session.libsignal.utilities.JsonUtil
import org.session.libsignal.utilities.Log
import org.session.libsignal.utilities.Snode
import org.session.libsignal.utilities.ThreadUtils
import org.session.libsignal.utilities.recover
import org.session.libsignal.utilities.toHexString
import java.util.Date
import kotlin.collections.set
private typealias Path = List<Snode>
@@ -356,6 +360,22 @@ object OnionRequestAPI {
val offset = timestamp - Date().time
SnodeAPI.clockOffset = offset
}
if (body.containsKey("hf")) {
@Suppress("UNCHECKED_CAST")
val currentHf = body["hf"] as List<Int>
if (currentHf.size < 2) {
Log.e("Loki", "Response contains fork information but doesn't have a hard and soft number")
} else {
val hf = currentHf[0]
val sf = currentHf[1]
val newForkInfo = ForkInfo(hf, sf)
if (newForkInfo > SnodeAPI.forkInfo) {
SnodeAPI.forkInfo = ForkInfo(hf,sf)
} else if (newForkInfo < SnodeAPI.forkInfo) {
Log.w("Loki", "Got a new snode info fork version that was $newForkInfo, less than current known ${SnodeAPI.forkInfo}")
}
}
}
if (statusCode != 200) {
val exception = HTTPRequestFailedAtDestinationException(statusCode, body, destination.description)
return@queue deferred.reject(exception)

View File

@@ -11,19 +11,33 @@ import com.goterl.lazysodium.interfaces.PwHash
import com.goterl.lazysodium.interfaces.SecretBox
import com.goterl.lazysodium.interfaces.Sign
import com.goterl.lazysodium.utils.Key
import nl.komponents.kovenant.*
import nl.komponents.kovenant.Promise
import nl.komponents.kovenant.all
import nl.komponents.kovenant.deferred
import nl.komponents.kovenant.functional.bind
import nl.komponents.kovenant.functional.map
import nl.komponents.kovenant.task
import org.session.libsession.messaging.MessagingModuleConfiguration
import org.session.libsession.messaging.utilities.MessageWrapper
import org.session.libsignal.crypto.getRandomElement
import org.session.libsignal.database.LokiAPIDatabaseProtocol
import org.session.libsignal.protos.SignalServiceProtos
import org.session.libsignal.utilities.*
import org.session.libsignal.utilities.Base64
import org.session.libsignal.utilities.Broadcaster
import org.session.libsignal.utilities.HTTP
import org.session.libsignal.utilities.Hex
import org.session.libsignal.utilities.Log
import org.session.libsignal.utilities.Snode
import org.session.libsignal.utilities.ThreadUtils
import org.session.libsignal.utilities.prettifiedDescription
import org.session.libsignal.utilities.retryIfNeeded
import java.security.SecureRandom
import java.util.*
import kotlin.Pair
import java.util.Date
import java.util.Locale
import kotlin.collections.component1
import kotlin.collections.component2
import kotlin.collections.set
import kotlin.properties.Delegates.observable
object SnodeAPI {
private val sodium by lazy { LazySodiumAndroid(SodiumAndroid()) }
@@ -41,6 +55,12 @@ object SnodeAPI {
* user's clock is incorrect.
*/
internal var clockOffset = 0L
internal var forkInfo by observable(database.getForkInfo()) { _, oldValue, newValue ->
if (newValue > oldValue) {
Log.d("Loki", "Setting new fork info new: $newValue, old: $oldValue")
database.setForkInfo(newValue)
}
}
// Settings
private val maxRetryCount = 6
@@ -55,11 +75,10 @@ object SnodeAPI {
setOf( "https://storage.seed1.loki.network:$seedNodePort", "https://storage.seed3.loki.network:$seedNodePort", "https://public.loki.foundation:$seedNodePort" )
}
}
private val snodeFailureThreshold = 3
private val targetSwarmSnodeCount = 2
private val useOnionRequests = true
private const val snodeFailureThreshold = 3
private const val useOnionRequests = true
internal val useTestnet = false
const val useTestnet = false
// Error
internal sealed class Error(val description: String) : Exception(description) {
@@ -254,11 +273,6 @@ object SnodeAPI {
return promise
}
fun getTargetSnodes(publicKey: String): Promise<List<Snode>, Exception> {
// SecureRandom() should be cryptographically secure
return getSwarm(publicKey).map { it.shuffled(SecureRandom()).take(targetSwarmSnodeCount) }
}
fun getSwarm(publicKey: String): Promise<Set<Snode>, Exception> {
val cachedSwarm = database.getSwarm(publicKey)
if (cachedSwarm != null && cachedSwarm.size >= minimumSwarmSnodeCount) {
@@ -266,7 +280,7 @@ object SnodeAPI {
cachedSwarmCopy.addAll(cachedSwarm)
return task { cachedSwarmCopy }
} else {
val parameters = mapOf( "pubKey" to if (useTestnet) publicKey.removing05PrefixIfNeeded() else publicKey )
val parameters = mapOf( "pubKey" to publicKey )
return getRandomSnode().bind {
invoke(Snode.Method.GetSwarm, it, publicKey, parameters)
}.map {
@@ -277,28 +291,39 @@ object SnodeAPI {
}
}
fun getRawMessages(snode: Snode, publicKey: String): RawResponsePromise {
// val userED25519KeyPair = MessagingModuleConfiguration.shared.getUserED25519KeyPair() ?: return Promise.ofFail(Error.NoKeyPair)
fun getRawMessages(snode: Snode, publicKey: String, requiresAuth: Boolean = true, namespace: Int = 0): RawResponsePromise {
val userED25519KeyPair = MessagingModuleConfiguration.shared.getUserED25519KeyPair() ?: return Promise.ofFail(Error.NoKeyPair)
// Get last message hash
val lastHashValue = database.getLastMessageHashValue(snode, publicKey) ?: ""
// Construct signature
// val timestamp = Date().time + SnodeAPI.clockOffset
// val ed25519PublicKey = userED25519KeyPair.publicKey.asHexString
// val verificationData = "retrieve$timestamp".toByteArray()
// val signature = ByteArray(Sign.BYTES)
// try {
// sodium.cryptoSignDetached(signature, verificationData, verificationData.size.toLong(), userED25519KeyPair.secretKey.asBytes)
// } catch (exception: Exception) {
// return Promise.ofFail(Error.SigningFailed)
// }
// Make the request
val parameters = mapOf(
"pubKey" to if (useTestnet) publicKey.removing05PrefixIfNeeded() else publicKey,
"lastHash" to lastHashValue,
// "timestamp" to timestamp,
// "pubkey_ed25519" to ed25519PublicKey,
// "signature" to Base64.encodeBytes(signature)
val lastHashValue = database.getLastMessageHashValue(snode, publicKey, namespace) ?: ""
val parameters = mutableMapOf<String,Any>(
"pubKey" to publicKey,
"last_hash" to lastHashValue,
)
// Construct signature
if (requiresAuth) {
val timestamp = Date().time + SnodeAPI.clockOffset
val ed25519PublicKey = userED25519KeyPair.publicKey.asHexString
val signature = ByteArray(Sign.BYTES)
val verificationData =
if (namespace != 0) "retrieve$namespace$timestamp".toByteArray()
else "retrieve$timestamp".toByteArray()
try {
sodium.cryptoSignDetached(signature, verificationData, verificationData.size.toLong(), userED25519KeyPair.secretKey.asBytes)
} catch (exception: Exception) {
return Promise.ofFail(Error.SigningFailed)
}
parameters["timestamp"] = timestamp
parameters["pubkey_ed25519"] = ed25519PublicKey
parameters["signature"] = Base64.encodeBytes(signature)
}
// If the namespace is default (0) here it will be implicitly read as 0 on the storage server
// we only need to specify it explicitly if we want to (in future) or if it is non-zero
if (namespace != 0) {
parameters["namespace"] = namespace
}
// Make the request
return invoke(Snode.Method.GetMessages, snode, publicKey, parameters)
}
@@ -317,14 +342,35 @@ object SnodeAPI {
}
}
fun sendMessage(message: SnodeMessage): Promise<Set<RawResponsePromise>, Exception> {
val destination = if (useTestnet) message.recipient.removing05PrefixIfNeeded() else message.recipient
fun sendMessage(message: SnodeMessage, requiresAuth: Boolean = false, namespace: Int = 0): RawResponsePromise {
val destination = message.recipient
return retryIfNeeded(maxRetryCount) {
getTargetSnodes(destination).map { swarm ->
swarm.map { snode ->
val parameters = message.toJSON()
invoke(Snode.Method.SendMessage, snode, destination, parameters)
}.toSet()
val module = MessagingModuleConfiguration.shared
val userED25519KeyPair = module.getUserED25519KeyPair() ?: return@retryIfNeeded Promise.ofFail(Error.NoKeyPair)
val parameters = message.toJSON().toMutableMap<String,Any>()
// Construct signature
if (requiresAuth) {
val sigTimestamp = System.currentTimeMillis() + SnodeAPI.clockOffset
val ed25519PublicKey = userED25519KeyPair.publicKey.asHexString
val signature = ByteArray(Sign.BYTES)
// assume namespace here is non-zero, as zero namespace doesn't require auth
val verificationData = "store$namespace$sigTimestamp".toByteArray()
try {
sodium.cryptoSignDetached(signature, verificationData, verificationData.size.toLong(), userED25519KeyPair.secretKey.asBytes)
} catch (exception: Exception) {
return@retryIfNeeded Promise.ofFail(Error.SigningFailed)
}
parameters["sig_timestamp"] = sigTimestamp
parameters["pubkey_ed25519"] = ed25519PublicKey
parameters["signature"] = Base64.encodeBytes(signature)
}
// If the namespace is default (0) here it will be implicitly read as 0 on the storage server
// we only need to specify it explicitly if we want to (in future) or if it is non-zero
if (namespace != 0) {
parameters["namespace"] = namespace
}
getSingleTargetSnode(destination).bind { snode ->
invoke(Snode.Method.SendMessage, snode, destination, parameters)
}
}
}
@@ -426,29 +472,29 @@ object SnodeAPI {
}
}
fun parseRawMessagesResponse(rawResponse: RawResponse, snode: Snode, publicKey: String): List<Pair<SignalServiceProtos.Envelope, String?>> {
fun parseRawMessagesResponse(rawResponse: RawResponse, snode: Snode, publicKey: String, namespace: Int = 0): List<Pair<SignalServiceProtos.Envelope, String?>> {
val messages = rawResponse["messages"] as? List<*>
return if (messages != null) {
updateLastMessageHashValueIfPossible(snode, publicKey, messages)
val newRawMessages = removeDuplicates(publicKey, messages)
updateLastMessageHashValueIfPossible(snode, publicKey, messages, namespace)
val newRawMessages = removeDuplicates(publicKey, messages, namespace)
return parseEnvelopes(newRawMessages);
} else {
listOf()
}
}
private fun updateLastMessageHashValueIfPossible(snode: Snode, publicKey: String, rawMessages: List<*>) {
private fun updateLastMessageHashValueIfPossible(snode: Snode, publicKey: String, rawMessages: List<*>, namespace: Int) {
val lastMessageAsJSON = rawMessages.lastOrNull() as? Map<*, *>
val hashValue = lastMessageAsJSON?.get("hash") as? String
if (hashValue != null) {
database.setLastMessageHashValue(snode, publicKey, hashValue)
database.setLastMessageHashValue(snode, publicKey, hashValue, namespace)
} else if (rawMessages.isNotEmpty()) {
Log.d("Loki", "Failed to update last message hash value from: ${rawMessages.prettifiedDescription()}.")
}
}
private fun removeDuplicates(publicKey: String, rawMessages: List<*>): List<*> {
val receivedMessageHashValues = database.getReceivedMessageHashValues(publicKey)?.toMutableSet() ?: mutableSetOf()
private fun removeDuplicates(publicKey: String, rawMessages: List<*>, namespace: Int): List<*> {
val receivedMessageHashValues = database.getReceivedMessageHashValues(publicKey, namespace)?.toMutableSet() ?: mutableSetOf()
val result = rawMessages.filter { rawMessage ->
val rawMessageAsJSON = rawMessage as? Map<*, *>
val hashValue = rawMessageAsJSON?.get("hash") as? String
@@ -461,7 +507,7 @@ object SnodeAPI {
false
}
}
database.setReceivedMessageHashValues(publicKey, receivedMessageHashValues)
database.setReceivedMessageHashValues(publicKey, receivedMessageHashValues, namespace)
return result
}

View File

@@ -1,7 +1,5 @@
package org.session.libsession.snode
import org.session.libsignal.utilities.removing05PrefixIfNeeded
data class SnodeMessage(
/**
* The hex encoded public key of the recipient.
@@ -25,11 +23,10 @@ data class SnodeMessage(
internal fun toJSON(): Map<String, String> {
return mapOf(
"pubKey" to if (SnodeAPI.useTestnet) recipient.removing05PrefixIfNeeded() else recipient,
"pubKey" to recipient,
"data" to data,
"ttl" to ttl.toString(),
"timestamp" to timestamp.toString(),
"nonce" to ""
)
}
}