feat: expand on the config sync job, finish basic implementation to test against

This commit is contained in:
0x330a 2023-02-10 16:41:33 +11:00
parent 336604b9e5
commit acd14843b8
No known key found for this signature in database
GPG Key ID: 267811D6E6A2698C
14 changed files with 345 additions and 91 deletions

View File

@ -8,11 +8,13 @@ import org.session.libsession.messaging.BlindedIdMapping
import org.session.libsession.messaging.calls.CallMessageType import org.session.libsession.messaging.calls.CallMessageType
import org.session.libsession.messaging.contacts.Contact import org.session.libsession.messaging.contacts.Contact
import org.session.libsession.messaging.jobs.AttachmentUploadJob import org.session.libsession.messaging.jobs.AttachmentUploadJob
import org.session.libsession.messaging.jobs.ConfigurationSyncJob
import org.session.libsession.messaging.jobs.GroupAvatarDownloadJob import org.session.libsession.messaging.jobs.GroupAvatarDownloadJob
import org.session.libsession.messaging.jobs.Job import org.session.libsession.messaging.jobs.Job
import org.session.libsession.messaging.jobs.JobQueue import org.session.libsession.messaging.jobs.JobQueue
import org.session.libsession.messaging.jobs.MessageReceiveJob import org.session.libsession.messaging.jobs.MessageReceiveJob
import org.session.libsession.messaging.jobs.MessageSendJob import org.session.libsession.messaging.jobs.MessageSendJob
import org.session.libsession.messaging.messages.Destination
import org.session.libsession.messaging.messages.Message import org.session.libsession.messaging.messages.Message
import org.session.libsession.messaging.messages.control.ConfigurationMessage import org.session.libsession.messaging.messages.control.ConfigurationMessage
import org.session.libsession.messaging.messages.control.MessageRequestResponse import org.session.libsession.messaging.messages.control.MessageRequestResponse
@ -240,6 +242,12 @@ class Storage(context: Context, helper: SQLCipherOpenHelper) : Database(context,
return DatabaseComponent.get(context).sessionJobDatabase().getGroupAvatarDownloadJob(server, room) return DatabaseComponent.get(context).sessionJobDatabase().getGroupAvatarDownloadJob(server, room)
} }
override fun getConfigSyncJob(destination: Destination): Job? {
return DatabaseComponent.get(context).sessionJobDatabase().getAllPendingJobs(ConfigurationSyncJob.KEY).values.firstOrNull {
(it as? ConfigurationSyncJob)?.destination == destination
}
}
override fun resumeMessageSendJobIfNeeded(messageSendJobID: String) { override fun resumeMessageSendJobIfNeeded(messageSendJobID: String) {
val job = DatabaseComponent.get(context).sessionJobDatabase().getMessageSendJob(messageSendJobID) ?: return val job = DatabaseComponent.get(context).sessionJobDatabase().getMessageSendJob(messageSendJobID) ?: return
JobQueue.shared.resumePendingSendMessage(job) JobQueue.shared.resumePendingSendMessage(job)

View File

@ -135,6 +135,20 @@ class ConfigFactory(private val context: Context,
} }
} }
override fun getHashesFor(forConfigObject: ConfigBase): List<String> =
when (forConfigObject) {
is UserProfile -> userHashes.toList()
is Contacts -> contactsHashes.toList()
is ConversationVolatileConfig -> convoHashes.toList()
}
override fun removeHashesFor(forConfigObject: ConfigBase, deletedHashes: Set<String>) =
when (forConfigObject) {
is UserProfile -> userHashes.removeAll(deletedHashes)
is Contacts -> contactsHashes.removeAll(deletedHashes)
is ConversationVolatileConfig -> convoHashes.removeAll(deletedHashes)
}
private fun updateUser(userProfile: UserProfile) { private fun updateUser(userProfile: UserProfile) {
val (_, userPublicKey) = maybeGetUserInfo() ?: return val (_, userPublicKey) = maybeGetUserInfo() ?: return
// would love to get rid of recipient and context from this // would love to get rid of recipient and context from this

View File

@ -8,22 +8,31 @@ import network.loki.messenger.libsession_util.util.Contact
import network.loki.messenger.libsession_util.util.UserPic import network.loki.messenger.libsession_util.util.UserPic
import nl.komponents.kovenant.Promise import nl.komponents.kovenant.Promise
import org.session.libsession.messaging.MessagingModuleConfiguration import org.session.libsession.messaging.MessagingModuleConfiguration
import org.session.libsession.messaging.jobs.ConfigurationSyncJob
import org.session.libsession.messaging.jobs.JobQueue
import org.session.libsession.messaging.messages.Destination import org.session.libsession.messaging.messages.Destination
import org.session.libsession.messaging.messages.control.ConfigurationMessage import org.session.libsession.messaging.messages.control.ConfigurationMessage
import org.session.libsession.messaging.sending_receiving.MessageSender import org.session.libsession.messaging.sending_receiving.MessageSender
import org.session.libsession.utilities.Address import org.session.libsession.utilities.Address
import org.session.libsession.utilities.TextSecurePreferences import org.session.libsession.utilities.TextSecurePreferences
import org.session.libsignal.utilities.Log
object ConfigurationMessageUtilities { object ConfigurationMessageUtilities {
@JvmStatic @JvmStatic
fun syncConfigurationIfNeeded(context: Context) { fun syncConfigurationIfNeeded(context: Context) {
// add if check here to schedule new config job process and return early // add if check here to schedule new config job process and return early
if (ConfigBase.isNewConfigEnabled) {
// schedule job if none exist
TODO()
}
val userPublicKey = TextSecurePreferences.getLocalNumber(context) ?: return val userPublicKey = TextSecurePreferences.getLocalNumber(context) ?: return
val storage = MessagingModuleConfiguration.shared.storage
if (ConfigBase.isNewConfigEnabled) {
// don't schedule job if we already have one
val ourDestination = Destination.Contact(userPublicKey)
if (storage.getConfigSyncJob(ourDestination) != null) return
val newConfigSync = ConfigurationSyncJob(ourDestination)
Log.d("Loki", "Scheduling new ConfigurationSyncJob")
JobQueue.shared.add(newConfigSync)
return
}
val lastSyncTime = TextSecurePreferences.getLastConfigurationSyncTime(context) val lastSyncTime = TextSecurePreferences.getLastConfigurationSyncTime(context)
val now = System.currentTimeMillis() val now = System.currentTimeMillis()
if (now - lastSyncTime < 7 * 24 * 60 * 60 * 1000) return if (now - lastSyncTime < 7 * 24 * 60 * 60 * 1000) return

View File

@ -40,7 +40,8 @@ android {
dependencies { dependencies {
testImplementation 'junit:junit:4.13.2' testImplementation 'junit:junit:4.13.2'
api(project(":libsignal")) implementation(project(":libsignal"))
implementation "com.google.protobuf:protobuf-java:$protobufVersion"
androidTestImplementation 'androidx.test.ext:junit:1.1.4' androidTestImplementation 'androidx.test.ext:junit:1.1.4'
androidTestImplementation 'androidx.test.espresso:espresso-core:3.5.0' androidTestImplementation 'androidx.test.espresso:espresso-core:3.5.0'
} }

View File

@ -4,6 +4,7 @@ import network.loki.messenger.libsession_util.util.ConfigWithSeqNo
import network.loki.messenger.libsession_util.util.Contact import network.loki.messenger.libsession_util.util.Contact
import network.loki.messenger.libsession_util.util.Conversation import network.loki.messenger.libsession_util.util.Conversation
import network.loki.messenger.libsession_util.util.UserPic import network.loki.messenger.libsession_util.util.UserPic
import org.session.libsignal.protos.SignalServiceProtos.SharedConfigMessage.Kind
sealed class ConfigBase(protected val /* yucky */ pointer: Long) { sealed class ConfigBase(protected val /* yucky */ pointer: Long) {
@ -13,6 +14,12 @@ sealed class ConfigBase(protected val /* yucky */ pointer: Long) {
} }
external fun kindFor(configNamespace: Int): Class<ConfigBase> external fun kindFor(configNamespace: Int): Class<ConfigBase>
fun ConfigBase.protoKindFor(): Kind = when (this) {
is UserProfile -> Kind.USER_PROFILE
is Contacts -> Kind.CONTACTS
is ConversationVolatileConfig -> Kind.CONVO_INFO_VOLATILE
}
const val isNewConfigEnabled = true const val isNewConfigEnabled = true
} }

View File

@ -35,7 +35,6 @@ dependencies {
implementation 'com.annimon:stream:1.1.8' implementation 'com.annimon:stream:1.1.8'
implementation 'com.makeramen:roundedimageview:2.1.0' implementation 'com.makeramen:roundedimageview:2.1.0'
implementation 'com.esotericsoftware:kryo:5.1.1' implementation 'com.esotericsoftware:kryo:5.1.1'
implementation "com.google.protobuf:protobuf-java:$protobufVersion"
implementation "com.fasterxml.jackson.core:jackson-databind:$jacksonDatabindVersion" implementation "com.fasterxml.jackson.core:jackson-databind:$jacksonDatabindVersion"
implementation "com.github.oxen-io.session-android-curve-25519:curve25519-java:$curve25519Version" implementation "com.github.oxen-io.session-android-curve-25519:curve25519-java:$curve25519Version"
implementation "com.squareup.okhttp3:okhttp:$okhttpVersion" implementation "com.squareup.okhttp3:okhttp:$okhttpVersion"

View File

@ -8,6 +8,7 @@ import org.session.libsession.messaging.contacts.Contact
import org.session.libsession.messaging.jobs.AttachmentUploadJob import org.session.libsession.messaging.jobs.AttachmentUploadJob
import org.session.libsession.messaging.jobs.Job import org.session.libsession.messaging.jobs.Job
import org.session.libsession.messaging.jobs.MessageSendJob import org.session.libsession.messaging.jobs.MessageSendJob
import org.session.libsession.messaging.messages.Destination
import org.session.libsession.messaging.messages.Message import org.session.libsession.messaging.messages.Message
import org.session.libsession.messaging.messages.control.ConfigurationMessage import org.session.libsession.messaging.messages.control.ConfigurationMessage
import org.session.libsession.messaging.messages.control.MessageRequestResponse import org.session.libsession.messaging.messages.control.MessageRequestResponse
@ -50,6 +51,7 @@ interface StorageProtocol {
fun getMessageSendJob(messageSendJobID: String): MessageSendJob? fun getMessageSendJob(messageSendJobID: String): MessageSendJob?
fun getMessageReceiveJob(messageReceiveJobID: String): Job? fun getMessageReceiveJob(messageReceiveJobID: String): Job?
fun getGroupAvatarDownloadJob(server: String, room: String): Job? fun getGroupAvatarDownloadJob(server: String, room: String): Job?
fun getConfigSyncJob(destination: Destination): Job?
fun resumeMessageSendJobIfNeeded(messageSendJobID: String) fun resumeMessageSendJobIfNeeded(messageSendJobID: String)
fun isJobCanceled(job: Job): Boolean fun isJobCanceled(job: Job): Boolean

View File

@ -1,9 +1,15 @@
package org.session.libsession.messaging.jobs package org.session.libsession.messaging.jobs
import network.loki.messenger.libsession_util.ConfigBase import network.loki.messenger.libsession_util.ConfigBase
import network.loki.messenger.libsession_util.ConfigBase.Companion.protoKindFor
import nl.komponents.kovenant.functional.bind
import org.session.libsession.messaging.MessagingModuleConfiguration import org.session.libsession.messaging.MessagingModuleConfiguration
import org.session.libsession.messaging.messages.Destination import org.session.libsession.messaging.messages.Destination
import org.session.libsession.messaging.messages.control.SharedConfigurationMessage
import org.session.libsession.messaging.sending_receiving.MessageSender
import org.session.libsession.messaging.utilities.Data import org.session.libsession.messaging.utilities.Data
import org.session.libsession.snode.RawResponse
import org.session.libsession.snode.SnodeAPI
import org.session.libsignal.utilities.Log import org.session.libsignal.utilities.Log
// only contact (self) and closed group destinations will be supported // only contact (self) and closed group destinations will be supported
@ -16,20 +22,122 @@ data class ConfigurationSyncJob(val destination: Destination): Job {
override suspend fun execute() { override suspend fun execute() {
val userEdKeyPair = MessagingModuleConfiguration.shared.getUserED25519KeyPair() val userEdKeyPair = MessagingModuleConfiguration.shared.getUserED25519KeyPair()
if (destination is Destination.ClosedGroup val userPublicKey = MessagingModuleConfiguration.shared.storage.getUserPublicKey()
val delegate = delegate
if (destination is Destination.ClosedGroup // TODO: closed group configs will be handled in closed group feature
// if we haven't enabled the new configs don't run
|| !ConfigBase.isNewConfigEnabled || !ConfigBase.isNewConfigEnabled
// if we don't have a user ed key pair for signing updates
|| userEdKeyPair == null || userEdKeyPair == null
// this will be useful to not handle null delegate cases
|| delegate == null
// check our local identity key exists
|| userPublicKey.isNullOrEmpty()
// don't allow pushing configs for non-local user
|| (destination is Destination.Contact && destination.publicKey != userPublicKey)
) { ) {
// TODO: currently we only deal with single destination until closed groups refactor / implement LCG Log.w(TAG, "No need to run config sync job, TODO")
Log.w(TAG, "Not handling config sync job, TODO")
delegate?.handleJobSucceeded(this) delegate?.handleJobSucceeded(this)
return return
} }
// configFactory singleton instance will come in handy for modifying hashes and fetching configs for namespace etc
val configFactory = MessagingModuleConfiguration.shared.configFactory val configFactory = MessagingModuleConfiguration.shared.configFactory
// get latest states, filter out configs that don't need push
val configsRequiringPush = listOfNotNull(
configFactory.user,
configFactory.contacts,
configFactory.convoVolatile
).filter { config -> config.needsPush() }
// don't run anything if we don't need to push anything
if (configsRequiringPush.isEmpty()) return delegate.handleJobSucceeded(this)
// allow null results here so the list index matches configsRequiringPush
val batchObjects: List<Pair<SharedConfigurationMessage, SnodeAPI.SnodeBatchRequestInfo>?> = configsRequiringPush.map { config ->
val (data, seqNo) = config.push()
SharedConfigurationMessage(config.protoKindFor(), data, seqNo) to config
}.map { (message, config) ->
// return a list of batch request objects
val snodeMessage = MessageSender.buildWrappedMessageToSnode(destination, message, true)
val authenticated = SnodeAPI.buildAuthenticatedStoreBatchInfo(
destination.destinationPublicKey(),
config.configNamespace(),
snodeMessage
) ?: return@map null // this entry will be null otherwise
message to authenticated // to keep track of seqNo for calling confirmPushed later
}
val toDeleteRequest = configsRequiringPush.map { base ->
configFactory.getHashesFor(base)
// accumulate by adding together
}.reduce(List<String>::plus).let { toDeleteFromAllNamespaces ->
if (toDeleteFromAllNamespaces.isEmpty()) null
else SnodeAPI.buildAuthenticatedDeleteBatchInfo(destination.destinationPublicKey(), toDeleteFromAllNamespaces)
}
if (batchObjects.any { it == null }) {
// stop running here, something like a signing error occurred
return delegate.handleJobFailedPermanently(this, NullPointerException("One or more requests had a null batch request info"))
}
val allRequests = mutableListOf<SnodeAPI.SnodeBatchRequestInfo>()
allRequests += batchObjects.requireNoNulls().map { (_, request) -> request }
// add in the deletion if we have any hashes
if (toDeleteRequest != null) allRequests += toDeleteRequest
val batchResponse = SnodeAPI.getSingleTargetSnode(destination.destinationPublicKey()).bind { snode ->
SnodeAPI.getRawBatchResponse(
snode,
destination.destinationPublicKey(),
allRequests,
sequence = true
)
}
try {
val rawResponses = batchResponse.get()
@Suppress("UNCHECKED_CAST")
val responseList = (rawResponses["results"] as List<RawResponse>)
// we are always adding in deletions at the end
val deletionResponse = if (toDeleteRequest != null) responseList.last() else null
val deletedHashes = deletionResponse?.let {
@Suppress("UNCHECKED_CAST")
deletionResponse["deleted"] as? List<String>
}?.toSet() ?: emptySet()
// at this point responseList index should line up with configsRequiringPush index
configsRequiringPush.forEachIndexed { index, config ->
val (toPushMessage, _) = batchObjects[index]!!
val response = responseList[index]
val insertHash = response["hash"] as? String ?: run {
Log.w(TAG, "No hash returned for the configuration in namespace ${config.configNamespace()}")
return@forEachIndexed
}
// confirm pushed seqno
val thisSeqNo = toPushMessage.seqNo
config.confirmPushed(thisSeqNo)
// wipe any of the existing hashes which we deleted (they may or may not be in this namespace)
if (configFactory.removeHashesFor(config, deletedHashes.toSet())) {
Log.d(TAG, "Successfully removed the deleted hashes from ${config.javaClass.simpleName}")
}
// store the new hash in list of hashes to track against
configFactory.appendHash(config, insertHash)
// dump and write config after successful
configFactory.persist(config)
}
} catch (e: Exception) {
Log.e(TAG, "Error performing batch request")
}
delegate.handleJobSucceeded(this)
}
fun Destination.destinationPublicKey(): String = when (this) {
is Destination.Contact -> publicKey
is Destination.ClosedGroup -> groupPublicKey
else -> throw NullPointerException("Not public key for this destination")
} }
override fun serialize(): Data { override fun serialize(): Data {
@ -74,5 +182,4 @@ data class ConfigurationSyncJob(val destination: Destination): Job {
return ConfigurationSyncJob(destination) return ConfigurationSyncJob(destination)
} }
} }
} }

View File

@ -217,6 +217,7 @@ class JobQueue : JobDelegate {
GroupAvatarDownloadJob.KEY, GroupAvatarDownloadJob.KEY,
BackgroundGroupAddJob.KEY, BackgroundGroupAddJob.KEY,
OpenGroupDeleteJob.KEY, OpenGroupDeleteJob.KEY,
ConfigurationSyncJob.KEY,
) )
allJobTypes.forEach { type -> allJobTypes.forEach { type ->
resumePendingJobs(type) resumePendingJobs(type)

View File

@ -13,6 +13,7 @@ import org.session.libsession.messaging.messages.control.ClosedGroupControlMessa
import org.session.libsession.messaging.messages.control.ConfigurationMessage import org.session.libsession.messaging.messages.control.ConfigurationMessage
import org.session.libsession.messaging.messages.control.ExpirationTimerUpdate import org.session.libsession.messaging.messages.control.ExpirationTimerUpdate
import org.session.libsession.messaging.messages.control.MessageRequestResponse import org.session.libsession.messaging.messages.control.MessageRequestResponse
import org.session.libsession.messaging.messages.control.SharedConfigurationMessage
import org.session.libsession.messaging.messages.control.UnsendRequest import org.session.libsession.messaging.messages.control.UnsendRequest
import org.session.libsession.messaging.messages.visible.LinkPreview import org.session.libsession.messaging.messages.visible.LinkPreview
import org.session.libsession.messaging.messages.visible.Quote import org.session.libsession.messaging.messages.visible.Quote
@ -70,46 +71,45 @@ object MessageSender {
} }
// One-on-One Chats & Closed Groups // One-on-One Chats & Closed Groups
private fun sendToSnodeDestination(destination: Destination, message: Message, isSyncMessage: Boolean = false): Promise<Unit, Exception> { @Throws(Exception::class)
val deferred = deferred<Unit, Exception>() fun buildWrappedMessageToSnode(destination: Destination, message: Message, isSyncMessage: Boolean): SnodeMessage {
val promise = deferred.promise
val storage = MessagingModuleConfiguration.shared.storage val storage = MessagingModuleConfiguration.shared.storage
val userPublicKey = storage.getUserPublicKey() val userPublicKey = storage.getUserPublicKey()
// Set the timestamp, sender and recipient // Set the timestamp, sender and recipient
val messageSendTime = SnodeAPI.nowWithOffset
if (message.sentTimestamp == null) { if (message.sentTimestamp == null) {
message.sentTimestamp = System.currentTimeMillis() // Visible messages will already have their sent timestamp set message.sentTimestamp =
messageSendTime // Visible messages will already have their sent timestamp set
} }
val messageSendTime = System.currentTimeMillis()
message.sender = userPublicKey message.sender = userPublicKey
val isSelfSend = (message.recipient == userPublicKey)
// Set the failure handler (need it here already for precondition failure handling)
fun handleFailure(error: Exception) {
handleFailedMessageSend(message, error)
if (destination is Destination.Contact && message is VisibleMessage && !isSelfSend) {
SnodeModule.shared.broadcaster.broadcast("messageFailed", message.sentTimestamp!!)
}
deferred.reject(error)
}
try {
when (destination) { when (destination) {
is Destination.Contact -> message.recipient = destination.publicKey is Destination.Contact -> message.recipient = destination.publicKey
is Destination.ClosedGroup -> message.recipient = destination.groupPublicKey is Destination.ClosedGroup -> message.recipient = destination.groupPublicKey
else -> throw IllegalStateException("Destination should not be an open group.") else -> throw IllegalStateException("Destination should not be an open group.")
} }
val isSelfSend = (message.recipient == userPublicKey)
// Validate the message // Validate the message
if (!message.isValid()) { throw Error.InvalidMessage } if (!message.isValid()) {
throw Error.InvalidMessage
}
// Stop here if this is a self-send, unless it's: // Stop here if this is a self-send, unless it's:
// • a configuration message // • a configuration message
// • a sync message // • a sync message
// • a closed group control message of type `new` // • a closed group control message of type `new`
var isNewClosedGroupControlMessage = false var isNewClosedGroupControlMessage = false
if (message is ClosedGroupControlMessage && message.kind is ClosedGroupControlMessage.Kind.New) isNewClosedGroupControlMessage = true if (message is ClosedGroupControlMessage && message.kind is ClosedGroupControlMessage.Kind.New) isNewClosedGroupControlMessage =
if (isSelfSend && message !is ConfigurationMessage && !isSyncMessage && !isNewClosedGroupControlMessage && message !is UnsendRequest) { true
handleSuccessfulMessageSend(message, destination) if (isSelfSend
deferred.resolve(Unit) && message !is ConfigurationMessage
return promise && !isSyncMessage
&& !isNewClosedGroupControlMessage
&& message !is UnsendRequest
&& message !is SharedConfigurationMessage
) {
throw Error.InvalidMessage
} }
// Attach the user's profile if needed // Attach the user's profile if needed
if (message is VisibleMessage) { if (message is VisibleMessage) {
@ -126,7 +126,10 @@ object MessageSender {
val ciphertext = when (destination) { val ciphertext = when (destination) {
is Destination.Contact -> MessageEncrypter.encrypt(plaintext, destination.publicKey) is Destination.Contact -> MessageEncrypter.encrypt(plaintext, destination.publicKey)
is Destination.ClosedGroup -> { is Destination.ClosedGroup -> {
val encryptionKeyPair = MessagingModuleConfiguration.shared.storage.getLatestClosedGroupEncryptionKeyPair(destination.groupPublicKey)!! val encryptionKeyPair =
MessagingModuleConfiguration.shared.storage.getLatestClosedGroupEncryptionKeyPair(
destination.groupPublicKey
)!!
MessageEncrypter.encrypt(plaintext, encryptionKeyPair.hexEncodedPublicKey) MessageEncrypter.encrypt(plaintext, encryptionKeyPair.hexEncodedPublicKey)
} }
else -> throw IllegalStateException("Destination should not be open group.") else -> throw IllegalStateException("Destination should not be open group.")
@ -134,15 +137,6 @@ object MessageSender {
// Wrap the result // Wrap the result
val kind: SignalServiceProtos.Envelope.Type val kind: SignalServiceProtos.Envelope.Type
val senderPublicKey: String val senderPublicKey: String
// TODO: this might change in future for config messages
val forkInfo = SnodeAPI.forkInfo
val namespaces: List<Int> = when {
destination is Destination.ClosedGroup
&& forkInfo.defaultRequiresAuth() -> listOf(Namespace.UNAUTHENTICATED_CLOSED_GROUP)
destination is Destination.ClosedGroup
&& forkInfo.hasNamespaces() -> listOf(Namespace.UNAUTHENTICATED_CLOSED_GROUP, Namespace.DEFAULT)
else -> listOf(Namespace.DEFAULT)
}
when (destination) { when (destination) {
is Destination.Contact -> { is Destination.Contact -> {
kind = SignalServiceProtos.Envelope.Type.SESSION_MESSAGE kind = SignalServiceProtos.Envelope.Type.SESSION_MESSAGE
@ -155,16 +149,43 @@ object MessageSender {
else -> throw IllegalStateException("Destination should not be open group.") else -> throw IllegalStateException("Destination should not be open group.")
} }
val wrappedMessage = MessageWrapper.wrap(kind, message.sentTimestamp!!, senderPublicKey, ciphertext) val wrappedMessage = MessageWrapper.wrap(kind, message.sentTimestamp!!, senderPublicKey, ciphertext)
// Send the result
if (destination is Destination.Contact && message is VisibleMessage && !isSelfSend) {
SnodeModule.shared.broadcaster.broadcast("calculatingPoW", messageSendTime)
}
val base64EncodedData = Base64.encodeBytes(wrappedMessage) val base64EncodedData = Base64.encodeBytes(wrappedMessage)
// Send the result // Send the result
val timestamp = messageSendTime + SnodeAPI.clockOffset return SnodeMessage(
val snodeMessage = SnodeMessage(message.recipient!!, base64EncodedData, message.ttl, timestamp) message.recipient!!,
if (destination is Destination.Contact && message is VisibleMessage && !isSelfSend) { base64EncodedData,
SnodeModule.shared.broadcaster.broadcast("sendingMessage", messageSendTime) message.ttl,
messageSendTime
)
}
private fun sendToSnodeDestination(destination: Destination, message: Message, isSyncMessage: Boolean = false): Promise<Unit, Exception> {
val deferred = deferred<Unit, Exception>()
val promise = deferred.promise
val storage = MessagingModuleConfiguration.shared.storage
val userPublicKey = storage.getUserPublicKey()
// recipient will be set later, so initialize it as a function here
val isSelfSend = { message.recipient == userPublicKey }
// Set the failure handler (need it here already for precondition failure handling)
fun handleFailure(error: Exception) {
handleFailedMessageSend(message, error)
if (destination is Destination.Contact && message is VisibleMessage && !isSelfSend()) {
SnodeModule.shared.broadcaster.broadcast("messageFailed", message.sentTimestamp!!)
}
deferred.reject(error)
}
try {
val snodeMessage = buildWrappedMessageToSnode(destination, message, isSyncMessage)
// TODO: this might change in future for config messages
val forkInfo = SnodeAPI.forkInfo
val namespaces: List<Int> = when {
destination is Destination.ClosedGroup
&& forkInfo.defaultRequiresAuth() -> listOf(Namespace.UNAUTHENTICATED_CLOSED_GROUP)
destination is Destination.ClosedGroup
&& forkInfo.hasNamespaces() -> listOf(Namespace.UNAUTHENTICATED_CLOSED_GROUP, Namespace.DEFAULT)
else -> listOf(Namespace.DEFAULT)
} }
namespaces.map { namespace -> SnodeAPI.sendMessage(snodeMessage, requiresAuth = false, namespace = namespace) }.let { promises -> namespaces.map { namespace -> SnodeAPI.sendMessage(snodeMessage, requiresAuth = false, namespace = namespace) }.let { promises ->
var isSuccess = false var isSuccess = false
@ -174,9 +195,6 @@ object MessageSender {
promise.success { promise.success {
if (isSuccess) { return@success } // Succeed as soon as the first promise succeeds if (isSuccess) { return@success } // Succeed as soon as the first promise succeeds
isSuccess = true isSuccess = true
if (destination is Destination.Contact && message is VisibleMessage && !isSelfSend) {
SnodeModule.shared.broadcaster.broadcast("messageSent", messageSendTime)
}
val hash = it["hash"] as? String val hash = it["hash"] as? String
message.serverHash = hash message.serverHash = hash
handleSuccessfulMessageSend(message, destination, isSyncMessage) handleSuccessfulMessageSend(message, destination, isSyncMessage)

View File

@ -163,7 +163,8 @@ class Poller(private val configFactory: ConfigFactoryProtocol) {
val requestSparseArray = SparseArray<SnodeAPI.SnodeBatchRequestInfo>() val requestSparseArray = SparseArray<SnodeAPI.SnodeBatchRequestInfo>()
// get messages // get messages
SnodeAPI.buildAuthenticatedRetrieveBatchRequest(snode, userPublicKey)!!.also { personalMessages -> SnodeAPI.buildAuthenticatedRetrieveBatchRequest(snode, userPublicKey)!!.also { personalMessages ->
requestSparseArray[personalMessages.namespace] = personalMessages // namespaces here should always be set
requestSparseArray[personalMessages.namespace!!] = personalMessages
} }
// get the latest convo info volatile // get the latest convo info volatile
listOfNotNull(configFactory.user, configFactory.contacts, configFactory.convoVolatile).mapNotNull { config -> listOfNotNull(configFactory.user, configFactory.contacts, configFactory.convoVolatile).mapNotNull { config ->
@ -172,7 +173,8 @@ class Poller(private val configFactory: ConfigFactoryProtocol) {
config.configNamespace() config.configNamespace()
) )
}.forEach { request -> }.forEach { request ->
requestSparseArray[request.namespace] = request // namespaces here should always be set
requestSparseArray[request.namespace!!] = request
} }
val requests = requestSparseArray.valueIterator().asSequence().toList() val requests = requestSparseArray.valueIterator().asSequence().toList()

View File

@ -101,7 +101,7 @@ object SnodeAPI {
val method: String, val method: String,
val params: Map<String, Any>, val params: Map<String, Any>,
@Transient @Transient
val namespace: Int val namespace: Int?
) // assume signatures, pubkey and namespaces are attached in parameters if required ) // assume signatures, pubkey and namespaces are attached in parameters if required
// Internal API // Internal API
@ -365,10 +365,93 @@ object SnodeAPI {
return invoke(Snode.Method.Retrieve, snode, parameters, publicKey) return invoke(Snode.Method.Retrieve, snode, parameters, publicKey)
} }
fun buildAuthenticatedStoreBatchInfo(publicKey: String, namespace: Int, message: SnodeMessage): SnodeBatchRequestInfo? {
val params = mutableMapOf<String, Any>()
// load the message data params into the sub request
// currently loads:
// pubKey
// data
// ttl
// timestamp
params.putAll(message.toJSON())
params["namespace"] = namespace
// used for sig generation since it is also the value used in timestamp parameter
val messageTimestamp = message.timestamp
val userEd25519KeyPair = try {
MessagingModuleConfiguration.shared.getUserED25519KeyPair() ?: return null
} catch (e: Exception) {
return null
}
val ed25519PublicKey = userEd25519KeyPair.publicKey.asHexString
val signature = ByteArray(Sign.BYTES)
val verificationData = "store$namespace$messageTimestamp".toByteArray()
try {
sodium.cryptoSignDetached(
signature,
verificationData,
verificationData.size.toLong(),
userEd25519KeyPair.secretKey.asBytes
)
} catch (e: Exception) {
Log.e("Loki", "Signing data failed with user secret key", e)
}
// timestamp already set
params["pubkey_ed25519"] = ed25519PublicKey
params["signature"] = Base64.encodeBytes(signature)
return SnodeBatchRequestInfo(
Snode.Method.SendMessage.rawValue,
params,
namespace
)
}
/**
* Message hashes can be shared across multiple namespaces (for a single public key destination)
* @param publicKey the destination's identity public key to delete from (05...)
* @param messageHashes a list of stored message hashes to delete from the server
* @param required indicates that *at least one* message in the list is deleted from the server, otherwise it will return 404
*/
fun buildAuthenticatedDeleteBatchInfo(publicKey: String, messageHashes: List<String>, required: Boolean = false): SnodeBatchRequestInfo? {
val params = mutableMapOf(
"pubkey" to publicKey,
"required" to required, // could be omitted technically but explicit here
"messages" to messageHashes
)
val userEd25519KeyPair = try {
MessagingModuleConfiguration.shared.getUserED25519KeyPair() ?: return null
} catch (e: Exception) {
return null
}
val ed25519PublicKey = userEd25519KeyPair.publicKey.asHexString
val signature = ByteArray(Sign.BYTES)
val verificationData = "delete${messageHashes.joinToString("")}".toByteArray()
try {
sodium.cryptoSignDetached(
signature,
verificationData,
verificationData.size.toLong(),
userEd25519KeyPair.secretKey.asBytes
)
} catch (e: Exception) {
Log.e("Loki", "Signing data failed with user secret key", e)
return null
}
params["pubkey_ed25519"] = ed25519PublicKey
params["signature"] = Base64.encodeBytes(signature)
return SnodeBatchRequestInfo(
Snode.Method.Retrieve.rawValue,
params,
null
)
}
fun buildAuthenticatedRetrieveBatchRequest(snode: Snode, publicKey: String, namespace: Int = 0): SnodeBatchRequestInfo? { fun buildAuthenticatedRetrieveBatchRequest(snode: Snode, publicKey: String, namespace: Int = 0): SnodeBatchRequestInfo? {
val lastHashValue = database.getLastMessageHashValue(snode, publicKey, namespace) ?: "" val lastHashValue = database.getLastMessageHashValue(snode, publicKey, namespace) ?: ""
val params = mutableMapOf<String, Any>( val params = mutableMapOf<String, Any>(
"pubKey" to publicKey, "pubkey" to publicKey,
"last_hash" to lastHashValue, "last_hash" to lastHashValue,
) )
val userEd25519KeyPair = try { val userEd25519KeyPair = try {
@ -405,11 +488,11 @@ object SnodeAPI {
) )
} }
fun getRawBatchResponse(snode: Snode, publicKey: String, requests: List<SnodeBatchRequestInfo>): RawResponsePromise { fun getRawBatchResponse(snode: Snode, publicKey: String, requests: List<SnodeBatchRequestInfo>, sequence: Boolean = false): RawResponsePromise {
val parameters = mutableMapOf<String, Any>( val parameters = mutableMapOf<String, Any>(
"requests" to requests "requests" to requests
) )
return invoke(Snode.Method.Batch, snode, parameters, publicKey) return invoke(if (sequence) Snode.Method.Sequence else Snode.Method.Batch, snode, parameters, publicKey)
} }
fun getExpiries(messageHashes: List<String>, publicKey: String) : RawResponsePromise { fun getExpiries(messageHashes: List<String>, publicKey: String) : RawResponsePromise {

View File

@ -12,4 +12,6 @@ interface ConfigFactoryProtocol {
fun persist(forConfigObject: ConfigBase) fun persist(forConfigObject: ConfigBase)
fun appendHash(configObject: ConfigBase, hash: String) fun appendHash(configObject: ConfigBase, hash: String)
fun notifyUpdates(forConfigObject: ConfigBase) fun notifyUpdates(forConfigObject: ConfigBase)
fun getHashesFor(forConfigObject: ConfigBase): List<String>
fun removeHashesFor(config: ConfigBase, deletedHashes: Set<String>): Boolean
} }

View File

@ -12,6 +12,7 @@ class Snode(val address: String, val port: Int, val publicKeySet: KeySet?) {
Info("info"), Info("info"),
DeleteAll("delete_all"), DeleteAll("delete_all"),
Batch("batch"), Batch("batch"),
Sequence("sequence"),
Expire("expire"), Expire("expire"),
GetExpiries("get_expiries") GetExpiries("get_expiries")
} }