From acd14843b81f9845590bfffd880790c57f450b45 Mon Sep 17 00:00:00 2001 From: 0x330a <92654767+0x330a@users.noreply.github.com> Date: Fri, 10 Feb 2023 16:41:33 +1100 Subject: [PATCH] feat: expand on the config sync job, finish basic implementation to test against --- .../securesms/database/Storage.kt | 8 + .../securesms/dependencies/ConfigFactory.kt | 14 ++ .../util/ConfigurationMessageUtilities.kt | 17 +- libsession-util/build.gradle | 3 +- .../loki/messenger/libsession_util/Config.kt | 7 + libsession/build.gradle | 1 - .../libsession/database/StorageProtocol.kt | 2 + .../messaging/jobs/ConfigurationSyncJob.kt | 115 +++++++++++- .../libsession/messaging/jobs/JobQueue.kt | 1 + .../sending_receiving/MessageSender.kt | 168 ++++++++++-------- .../sending_receiving/pollers/Poller.kt | 6 +- .../org/session/libsession/snode/SnodeAPI.kt | 91 +++++++++- .../utilities/ConfigFactoryProtocol.kt | 2 + .../org/session/libsignal/utilities/Snode.kt | 1 + 14 files changed, 345 insertions(+), 91 deletions(-) diff --git a/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt b/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt index 272f12b65f..05eb1b468a 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt @@ -8,11 +8,13 @@ import org.session.libsession.messaging.BlindedIdMapping import org.session.libsession.messaging.calls.CallMessageType import org.session.libsession.messaging.contacts.Contact import org.session.libsession.messaging.jobs.AttachmentUploadJob +import org.session.libsession.messaging.jobs.ConfigurationSyncJob import org.session.libsession.messaging.jobs.GroupAvatarDownloadJob import org.session.libsession.messaging.jobs.Job import org.session.libsession.messaging.jobs.JobQueue import org.session.libsession.messaging.jobs.MessageReceiveJob import org.session.libsession.messaging.jobs.MessageSendJob +import org.session.libsession.messaging.messages.Destination import org.session.libsession.messaging.messages.Message import org.session.libsession.messaging.messages.control.ConfigurationMessage import org.session.libsession.messaging.messages.control.MessageRequestResponse @@ -240,6 +242,12 @@ class Storage(context: Context, helper: SQLCipherOpenHelper) : Database(context, return DatabaseComponent.get(context).sessionJobDatabase().getGroupAvatarDownloadJob(server, room) } + override fun getConfigSyncJob(destination: Destination): Job? { + return DatabaseComponent.get(context).sessionJobDatabase().getAllPendingJobs(ConfigurationSyncJob.KEY).values.firstOrNull { + (it as? ConfigurationSyncJob)?.destination == destination + } + } + override fun resumeMessageSendJobIfNeeded(messageSendJobID: String) { val job = DatabaseComponent.get(context).sessionJobDatabase().getMessageSendJob(messageSendJobID) ?: return JobQueue.shared.resumePendingSendMessage(job) diff --git a/app/src/main/java/org/thoughtcrime/securesms/dependencies/ConfigFactory.kt b/app/src/main/java/org/thoughtcrime/securesms/dependencies/ConfigFactory.kt index a29802a895..7679c97bc7 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/dependencies/ConfigFactory.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/dependencies/ConfigFactory.kt @@ -135,6 +135,20 @@ class ConfigFactory(private val context: Context, } } + override fun getHashesFor(forConfigObject: ConfigBase): List = + when (forConfigObject) { + is UserProfile -> userHashes.toList() + is Contacts -> contactsHashes.toList() + is ConversationVolatileConfig -> convoHashes.toList() + } + + override fun removeHashesFor(forConfigObject: ConfigBase, deletedHashes: Set) = + when (forConfigObject) { + is UserProfile -> userHashes.removeAll(deletedHashes) + is Contacts -> contactsHashes.removeAll(deletedHashes) + is ConversationVolatileConfig -> convoHashes.removeAll(deletedHashes) + } + private fun updateUser(userProfile: UserProfile) { val (_, userPublicKey) = maybeGetUserInfo() ?: return // would love to get rid of recipient and context from this diff --git a/app/src/main/java/org/thoughtcrime/securesms/util/ConfigurationMessageUtilities.kt b/app/src/main/java/org/thoughtcrime/securesms/util/ConfigurationMessageUtilities.kt index 6ba15352d7..6773b2afff 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/util/ConfigurationMessageUtilities.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/util/ConfigurationMessageUtilities.kt @@ -8,22 +8,31 @@ import network.loki.messenger.libsession_util.util.Contact import network.loki.messenger.libsession_util.util.UserPic import nl.komponents.kovenant.Promise import org.session.libsession.messaging.MessagingModuleConfiguration +import org.session.libsession.messaging.jobs.ConfigurationSyncJob +import org.session.libsession.messaging.jobs.JobQueue import org.session.libsession.messaging.messages.Destination import org.session.libsession.messaging.messages.control.ConfigurationMessage import org.session.libsession.messaging.sending_receiving.MessageSender import org.session.libsession.utilities.Address import org.session.libsession.utilities.TextSecurePreferences +import org.session.libsignal.utilities.Log object ConfigurationMessageUtilities { @JvmStatic fun syncConfigurationIfNeeded(context: Context) { // add if check here to schedule new config job process and return early - if (ConfigBase.isNewConfigEnabled) { - // schedule job if none exist - TODO() - } val userPublicKey = TextSecurePreferences.getLocalNumber(context) ?: return + val storage = MessagingModuleConfiguration.shared.storage + if (ConfigBase.isNewConfigEnabled) { + // don't schedule job if we already have one + val ourDestination = Destination.Contact(userPublicKey) + if (storage.getConfigSyncJob(ourDestination) != null) return + val newConfigSync = ConfigurationSyncJob(ourDestination) + Log.d("Loki", "Scheduling new ConfigurationSyncJob") + JobQueue.shared.add(newConfigSync) + return + } val lastSyncTime = TextSecurePreferences.getLastConfigurationSyncTime(context) val now = System.currentTimeMillis() if (now - lastSyncTime < 7 * 24 * 60 * 60 * 1000) return diff --git a/libsession-util/build.gradle b/libsession-util/build.gradle index 1ae54bcb7d..1b9c4eac37 100644 --- a/libsession-util/build.gradle +++ b/libsession-util/build.gradle @@ -40,7 +40,8 @@ android { dependencies { testImplementation 'junit:junit:4.13.2' - api(project(":libsignal")) + implementation(project(":libsignal")) + implementation "com.google.protobuf:protobuf-java:$protobufVersion" androidTestImplementation 'androidx.test.ext:junit:1.1.4' androidTestImplementation 'androidx.test.espresso:espresso-core:3.5.0' } \ No newline at end of file diff --git a/libsession-util/src/main/java/network/loki/messenger/libsession_util/Config.kt b/libsession-util/src/main/java/network/loki/messenger/libsession_util/Config.kt index 7fa62241a9..3f3a464c5d 100644 --- a/libsession-util/src/main/java/network/loki/messenger/libsession_util/Config.kt +++ b/libsession-util/src/main/java/network/loki/messenger/libsession_util/Config.kt @@ -4,6 +4,7 @@ import network.loki.messenger.libsession_util.util.ConfigWithSeqNo import network.loki.messenger.libsession_util.util.Contact import network.loki.messenger.libsession_util.util.Conversation import network.loki.messenger.libsession_util.util.UserPic +import org.session.libsignal.protos.SignalServiceProtos.SharedConfigMessage.Kind sealed class ConfigBase(protected val /* yucky */ pointer: Long) { @@ -13,6 +14,12 @@ sealed class ConfigBase(protected val /* yucky */ pointer: Long) { } external fun kindFor(configNamespace: Int): Class + fun ConfigBase.protoKindFor(): Kind = when (this) { + is UserProfile -> Kind.USER_PROFILE + is Contacts -> Kind.CONTACTS + is ConversationVolatileConfig -> Kind.CONVO_INFO_VOLATILE + } + const val isNewConfigEnabled = true } diff --git a/libsession/build.gradle b/libsession/build.gradle index b160d9aab2..eeb9b91f6c 100644 --- a/libsession/build.gradle +++ b/libsession/build.gradle @@ -35,7 +35,6 @@ dependencies { implementation 'com.annimon:stream:1.1.8' implementation 'com.makeramen:roundedimageview:2.1.0' implementation 'com.esotericsoftware:kryo:5.1.1' - implementation "com.google.protobuf:protobuf-java:$protobufVersion" implementation "com.fasterxml.jackson.core:jackson-databind:$jacksonDatabindVersion" implementation "com.github.oxen-io.session-android-curve-25519:curve25519-java:$curve25519Version" implementation "com.squareup.okhttp3:okhttp:$okhttpVersion" diff --git a/libsession/src/main/java/org/session/libsession/database/StorageProtocol.kt b/libsession/src/main/java/org/session/libsession/database/StorageProtocol.kt index aad8394b4c..bdde41817c 100644 --- a/libsession/src/main/java/org/session/libsession/database/StorageProtocol.kt +++ b/libsession/src/main/java/org/session/libsession/database/StorageProtocol.kt @@ -8,6 +8,7 @@ import org.session.libsession.messaging.contacts.Contact import org.session.libsession.messaging.jobs.AttachmentUploadJob import org.session.libsession.messaging.jobs.Job import org.session.libsession.messaging.jobs.MessageSendJob +import org.session.libsession.messaging.messages.Destination import org.session.libsession.messaging.messages.Message import org.session.libsession.messaging.messages.control.ConfigurationMessage import org.session.libsession.messaging.messages.control.MessageRequestResponse @@ -50,6 +51,7 @@ interface StorageProtocol { fun getMessageSendJob(messageSendJobID: String): MessageSendJob? fun getMessageReceiveJob(messageReceiveJobID: String): Job? fun getGroupAvatarDownloadJob(server: String, room: String): Job? + fun getConfigSyncJob(destination: Destination): Job? fun resumeMessageSendJobIfNeeded(messageSendJobID: String) fun isJobCanceled(job: Job): Boolean diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/ConfigurationSyncJob.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/ConfigurationSyncJob.kt index 739d61b2da..4e66cb4eaa 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/ConfigurationSyncJob.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/ConfigurationSyncJob.kt @@ -1,9 +1,15 @@ package org.session.libsession.messaging.jobs import network.loki.messenger.libsession_util.ConfigBase +import network.loki.messenger.libsession_util.ConfigBase.Companion.protoKindFor +import nl.komponents.kovenant.functional.bind import org.session.libsession.messaging.MessagingModuleConfiguration import org.session.libsession.messaging.messages.Destination +import org.session.libsession.messaging.messages.control.SharedConfigurationMessage +import org.session.libsession.messaging.sending_receiving.MessageSender import org.session.libsession.messaging.utilities.Data +import org.session.libsession.snode.RawResponse +import org.session.libsession.snode.SnodeAPI import org.session.libsignal.utilities.Log // only contact (self) and closed group destinations will be supported @@ -16,20 +22,122 @@ data class ConfigurationSyncJob(val destination: Destination): Job { override suspend fun execute() { val userEdKeyPair = MessagingModuleConfiguration.shared.getUserED25519KeyPair() - if (destination is Destination.ClosedGroup + val userPublicKey = MessagingModuleConfiguration.shared.storage.getUserPublicKey() + val delegate = delegate + if (destination is Destination.ClosedGroup // TODO: closed group configs will be handled in closed group feature + // if we haven't enabled the new configs don't run || !ConfigBase.isNewConfigEnabled + // if we don't have a user ed key pair for signing updates || userEdKeyPair == null + // this will be useful to not handle null delegate cases + || delegate == null + // check our local identity key exists + || userPublicKey.isNullOrEmpty() + // don't allow pushing configs for non-local user + || (destination is Destination.Contact && destination.publicKey != userPublicKey) ) { - // TODO: currently we only deal with single destination until closed groups refactor / implement LCG - Log.w(TAG, "Not handling config sync job, TODO") + Log.w(TAG, "No need to run config sync job, TODO") delegate?.handleJobSucceeded(this) return } + // configFactory singleton instance will come in handy for modifying hashes and fetching configs for namespace etc val configFactory = MessagingModuleConfiguration.shared.configFactory + // get latest states, filter out configs that don't need push + val configsRequiringPush = listOfNotNull( + configFactory.user, + configFactory.contacts, + configFactory.convoVolatile + ).filter { config -> config.needsPush() } + // don't run anything if we don't need to push anything + if (configsRequiringPush.isEmpty()) return delegate.handleJobSucceeded(this) + // allow null results here so the list index matches configsRequiringPush + val batchObjects: List?> = configsRequiringPush.map { config -> + val (data, seqNo) = config.push() + SharedConfigurationMessage(config.protoKindFor(), data, seqNo) to config + }.map { (message, config) -> + // return a list of batch request objects + val snodeMessage = MessageSender.buildWrappedMessageToSnode(destination, message, true) + val authenticated = SnodeAPI.buildAuthenticatedStoreBatchInfo( + destination.destinationPublicKey(), + config.configNamespace(), + snodeMessage + ) ?: return@map null // this entry will be null otherwise + message to authenticated // to keep track of seqNo for calling confirmPushed later + } + + val toDeleteRequest = configsRequiringPush.map { base -> + configFactory.getHashesFor(base) + // accumulate by adding together + }.reduce(List::plus).let { toDeleteFromAllNamespaces -> + if (toDeleteFromAllNamespaces.isEmpty()) null + else SnodeAPI.buildAuthenticatedDeleteBatchInfo(destination.destinationPublicKey(), toDeleteFromAllNamespaces) + } + + if (batchObjects.any { it == null }) { + // stop running here, something like a signing error occurred + return delegate.handleJobFailedPermanently(this, NullPointerException("One or more requests had a null batch request info")) + } + + val allRequests = mutableListOf() + allRequests += batchObjects.requireNoNulls().map { (_, request) -> request } + // add in the deletion if we have any hashes + if (toDeleteRequest != null) allRequests += toDeleteRequest + + val batchResponse = SnodeAPI.getSingleTargetSnode(destination.destinationPublicKey()).bind { snode -> + SnodeAPI.getRawBatchResponse( + snode, + destination.destinationPublicKey(), + allRequests, + sequence = true + ) + } + + try { + val rawResponses = batchResponse.get() + @Suppress("UNCHECKED_CAST") + val responseList = (rawResponses["results"] as List) + // we are always adding in deletions at the end + val deletionResponse = if (toDeleteRequest != null) responseList.last() else null + val deletedHashes = deletionResponse?.let { + @Suppress("UNCHECKED_CAST") + deletionResponse["deleted"] as? List + }?.toSet() ?: emptySet() + + // at this point responseList index should line up with configsRequiringPush index + configsRequiringPush.forEachIndexed { index, config -> + val (toPushMessage, _) = batchObjects[index]!! + val response = responseList[index] + val insertHash = response["hash"] as? String ?: run { + Log.w(TAG, "No hash returned for the configuration in namespace ${config.configNamespace()}") + return@forEachIndexed + } + + // confirm pushed seqno + val thisSeqNo = toPushMessage.seqNo + config.confirmPushed(thisSeqNo) + // wipe any of the existing hashes which we deleted (they may or may not be in this namespace) + if (configFactory.removeHashesFor(config, deletedHashes.toSet())) { + Log.d(TAG, "Successfully removed the deleted hashes from ${config.javaClass.simpleName}") + } + // store the new hash in list of hashes to track against + configFactory.appendHash(config, insertHash) + // dump and write config after successful + configFactory.persist(config) + } + } catch (e: Exception) { + Log.e(TAG, "Error performing batch request") + } + delegate.handleJobSucceeded(this) + } + + fun Destination.destinationPublicKey(): String = when (this) { + is Destination.Contact -> publicKey + is Destination.ClosedGroup -> groupPublicKey + else -> throw NullPointerException("Not public key for this destination") } override fun serialize(): Data { @@ -74,5 +182,4 @@ data class ConfigurationSyncJob(val destination: Destination): Job { return ConfigurationSyncJob(destination) } } - } \ No newline at end of file diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt index 94d8232430..ab8f3212ae 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt @@ -217,6 +217,7 @@ class JobQueue : JobDelegate { GroupAvatarDownloadJob.KEY, BackgroundGroupAddJob.KEY, OpenGroupDeleteJob.KEY, + ConfigurationSyncJob.KEY, ) allJobTypes.forEach { type -> resumePendingJobs(type) diff --git a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/MessageSender.kt b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/MessageSender.kt index 3cc23da1e7..90f5c680ad 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/MessageSender.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/MessageSender.kt @@ -13,6 +13,7 @@ import org.session.libsession.messaging.messages.control.ClosedGroupControlMessa import org.session.libsession.messaging.messages.control.ConfigurationMessage import org.session.libsession.messaging.messages.control.ExpirationTimerUpdate import org.session.libsession.messaging.messages.control.MessageRequestResponse +import org.session.libsession.messaging.messages.control.SharedConfigurationMessage import org.session.libsession.messaging.messages.control.UnsendRequest import org.session.libsession.messaging.messages.visible.LinkPreview import org.session.libsession.messaging.messages.visible.Quote @@ -70,70 +71,113 @@ object MessageSender { } // One-on-One Chats & Closed Groups + @Throws(Exception::class) + fun buildWrappedMessageToSnode(destination: Destination, message: Message, isSyncMessage: Boolean): SnodeMessage { + val storage = MessagingModuleConfiguration.shared.storage + val userPublicKey = storage.getUserPublicKey() + // Set the timestamp, sender and recipient + val messageSendTime = SnodeAPI.nowWithOffset + if (message.sentTimestamp == null) { + message.sentTimestamp = + messageSendTime // Visible messages will already have their sent timestamp set + } + + message.sender = userPublicKey + + when (destination) { + is Destination.Contact -> message.recipient = destination.publicKey + is Destination.ClosedGroup -> message.recipient = destination.groupPublicKey + else -> throw IllegalStateException("Destination should not be an open group.") + } + + val isSelfSend = (message.recipient == userPublicKey) + // Validate the message + if (!message.isValid()) { + throw Error.InvalidMessage + } + // Stop here if this is a self-send, unless it's: + // • a configuration message + // • a sync message + // • a closed group control message of type `new` + var isNewClosedGroupControlMessage = false + if (message is ClosedGroupControlMessage && message.kind is ClosedGroupControlMessage.Kind.New) isNewClosedGroupControlMessage = + true + if (isSelfSend + && message !is ConfigurationMessage + && !isSyncMessage + && !isNewClosedGroupControlMessage + && message !is UnsendRequest + && message !is SharedConfigurationMessage + ) { + throw Error.InvalidMessage + } + // Attach the user's profile if needed + if (message is VisibleMessage) { + message.profile = storage.getUserProfile() + } + if (message is MessageRequestResponse) { + message.profile = storage.getUserProfile() + } + // Convert it to protobuf + val proto = message.toProto() ?: throw Error.ProtoConversionFailed + // Serialize the protobuf + val plaintext = PushTransportDetails.getPaddedMessageBody(proto.toByteArray()) + // Encrypt the serialized protobuf + val ciphertext = when (destination) { + is Destination.Contact -> MessageEncrypter.encrypt(plaintext, destination.publicKey) + is Destination.ClosedGroup -> { + val encryptionKeyPair = + MessagingModuleConfiguration.shared.storage.getLatestClosedGroupEncryptionKeyPair( + destination.groupPublicKey + )!! + MessageEncrypter.encrypt(plaintext, encryptionKeyPair.hexEncodedPublicKey) + } + else -> throw IllegalStateException("Destination should not be open group.") + } + // Wrap the result + val kind: SignalServiceProtos.Envelope.Type + val senderPublicKey: String + when (destination) { + is Destination.Contact -> { + kind = SignalServiceProtos.Envelope.Type.SESSION_MESSAGE + senderPublicKey = "" + } + is Destination.ClosedGroup -> { + kind = SignalServiceProtos.Envelope.Type.CLOSED_GROUP_MESSAGE + senderPublicKey = destination.groupPublicKey + } + else -> throw IllegalStateException("Destination should not be open group.") + } + val wrappedMessage = MessageWrapper.wrap(kind, message.sentTimestamp!!, senderPublicKey, ciphertext) + val base64EncodedData = Base64.encodeBytes(wrappedMessage) + // Send the result + return SnodeMessage( + message.recipient!!, + base64EncodedData, + message.ttl, + messageSendTime + ) + } + private fun sendToSnodeDestination(destination: Destination, message: Message, isSyncMessage: Boolean = false): Promise { val deferred = deferred() val promise = deferred.promise val storage = MessagingModuleConfiguration.shared.storage val userPublicKey = storage.getUserPublicKey() - // Set the timestamp, sender and recipient - if (message.sentTimestamp == null) { - message.sentTimestamp = System.currentTimeMillis() // Visible messages will already have their sent timestamp set - } - val messageSendTime = System.currentTimeMillis() + // recipient will be set later, so initialize it as a function here + val isSelfSend = { message.recipient == userPublicKey } - message.sender = userPublicKey - val isSelfSend = (message.recipient == userPublicKey) // Set the failure handler (need it here already for precondition failure handling) fun handleFailure(error: Exception) { handleFailedMessageSend(message, error) - if (destination is Destination.Contact && message is VisibleMessage && !isSelfSend) { + if (destination is Destination.Contact && message is VisibleMessage && !isSelfSend()) { SnodeModule.shared.broadcaster.broadcast("messageFailed", message.sentTimestamp!!) } deferred.reject(error) } try { - when (destination) { - is Destination.Contact -> message.recipient = destination.publicKey - is Destination.ClosedGroup -> message.recipient = destination.groupPublicKey - else -> throw IllegalStateException("Destination should not be an open group.") - } - // Validate the message - if (!message.isValid()) { throw Error.InvalidMessage } - // Stop here if this is a self-send, unless it's: - // • a configuration message - // • a sync message - // • a closed group control message of type `new` - var isNewClosedGroupControlMessage = false - if (message is ClosedGroupControlMessage && message.kind is ClosedGroupControlMessage.Kind.New) isNewClosedGroupControlMessage = true - if (isSelfSend && message !is ConfigurationMessage && !isSyncMessage && !isNewClosedGroupControlMessage && message !is UnsendRequest) { - handleSuccessfulMessageSend(message, destination) - deferred.resolve(Unit) - return promise - } - // Attach the user's profile if needed - if (message is VisibleMessage) { - message.profile = storage.getUserProfile() - } - if (message is MessageRequestResponse) { - message.profile = storage.getUserProfile() - } - // Convert it to protobuf - val proto = message.toProto() ?: throw Error.ProtoConversionFailed - // Serialize the protobuf - val plaintext = PushTransportDetails.getPaddedMessageBody(proto.toByteArray()) - // Encrypt the serialized protobuf - val ciphertext = when (destination) { - is Destination.Contact -> MessageEncrypter.encrypt(plaintext, destination.publicKey) - is Destination.ClosedGroup -> { - val encryptionKeyPair = MessagingModuleConfiguration.shared.storage.getLatestClosedGroupEncryptionKeyPair(destination.groupPublicKey)!! - MessageEncrypter.encrypt(plaintext, encryptionKeyPair.hexEncodedPublicKey) - } - else -> throw IllegalStateException("Destination should not be open group.") - } - // Wrap the result - val kind: SignalServiceProtos.Envelope.Type - val senderPublicKey: String + val snodeMessage = buildWrappedMessageToSnode(destination, message, isSyncMessage) // TODO: this might change in future for config messages val forkInfo = SnodeAPI.forkInfo val namespaces: List = when { @@ -143,29 +187,6 @@ object MessageSender { && forkInfo.hasNamespaces() -> listOf(Namespace.UNAUTHENTICATED_CLOSED_GROUP, Namespace.DEFAULT) else -> listOf(Namespace.DEFAULT) } - when (destination) { - is Destination.Contact -> { - kind = SignalServiceProtos.Envelope.Type.SESSION_MESSAGE - senderPublicKey = "" - } - is Destination.ClosedGroup -> { - kind = SignalServiceProtos.Envelope.Type.CLOSED_GROUP_MESSAGE - senderPublicKey = destination.groupPublicKey - } - else -> throw IllegalStateException("Destination should not be open group.") - } - val wrappedMessage = MessageWrapper.wrap(kind, message.sentTimestamp!!, senderPublicKey, ciphertext) - // Send the result - if (destination is Destination.Contact && message is VisibleMessage && !isSelfSend) { - SnodeModule.shared.broadcaster.broadcast("calculatingPoW", messageSendTime) - } - val base64EncodedData = Base64.encodeBytes(wrappedMessage) - // Send the result - val timestamp = messageSendTime + SnodeAPI.clockOffset - val snodeMessage = SnodeMessage(message.recipient!!, base64EncodedData, message.ttl, timestamp) - if (destination is Destination.Contact && message is VisibleMessage && !isSelfSend) { - SnodeModule.shared.broadcaster.broadcast("sendingMessage", messageSendTime) - } namespaces.map { namespace -> SnodeAPI.sendMessage(snodeMessage, requiresAuth = false, namespace = namespace) }.let { promises -> var isSuccess = false val promiseCount = promises.size @@ -174,9 +195,6 @@ object MessageSender { promise.success { if (isSuccess) { return@success } // Succeed as soon as the first promise succeeds isSuccess = true - if (destination is Destination.Contact && message is VisibleMessage && !isSelfSend) { - SnodeModule.shared.broadcaster.broadcast("messageSent", messageSendTime) - } val hash = it["hash"] as? String message.serverHash = hash handleSuccessfulMessageSend(message, destination, isSyncMessage) diff --git a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/Poller.kt b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/Poller.kt index f24d92b456..9cd7d37fe1 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/Poller.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/Poller.kt @@ -163,7 +163,8 @@ class Poller(private val configFactory: ConfigFactoryProtocol) { val requestSparseArray = SparseArray() // get messages SnodeAPI.buildAuthenticatedRetrieveBatchRequest(snode, userPublicKey)!!.also { personalMessages -> - requestSparseArray[personalMessages.namespace] = personalMessages + // namespaces here should always be set + requestSparseArray[personalMessages.namespace!!] = personalMessages } // get the latest convo info volatile listOfNotNull(configFactory.user, configFactory.contacts, configFactory.convoVolatile).mapNotNull { config -> @@ -172,7 +173,8 @@ class Poller(private val configFactory: ConfigFactoryProtocol) { config.configNamespace() ) }.forEach { request -> - requestSparseArray[request.namespace] = request + // namespaces here should always be set + requestSparseArray[request.namespace!!] = request } val requests = requestSparseArray.valueIterator().asSequence().toList() diff --git a/libsession/src/main/java/org/session/libsession/snode/SnodeAPI.kt b/libsession/src/main/java/org/session/libsession/snode/SnodeAPI.kt index 06cf891012..61ebf295cd 100644 --- a/libsession/src/main/java/org/session/libsession/snode/SnodeAPI.kt +++ b/libsession/src/main/java/org/session/libsession/snode/SnodeAPI.kt @@ -101,7 +101,7 @@ object SnodeAPI { val method: String, val params: Map, @Transient - val namespace: Int + val namespace: Int? ) // assume signatures, pubkey and namespaces are attached in parameters if required // Internal API @@ -365,10 +365,93 @@ object SnodeAPI { return invoke(Snode.Method.Retrieve, snode, parameters, publicKey) } + fun buildAuthenticatedStoreBatchInfo(publicKey: String, namespace: Int, message: SnodeMessage): SnodeBatchRequestInfo? { + val params = mutableMapOf() + // load the message data params into the sub request + // currently loads: + // pubKey + // data + // ttl + // timestamp + params.putAll(message.toJSON()) + params["namespace"] = namespace + + // used for sig generation since it is also the value used in timestamp parameter + val messageTimestamp = message.timestamp + + val userEd25519KeyPair = try { + MessagingModuleConfiguration.shared.getUserED25519KeyPair() ?: return null + } catch (e: Exception) { + return null + } + + val ed25519PublicKey = userEd25519KeyPair.publicKey.asHexString + val signature = ByteArray(Sign.BYTES) + val verificationData = "store$namespace$messageTimestamp".toByteArray() + try { + sodium.cryptoSignDetached( + signature, + verificationData, + verificationData.size.toLong(), + userEd25519KeyPair.secretKey.asBytes + ) + } catch (e: Exception) { + Log.e("Loki", "Signing data failed with user secret key", e) + } + // timestamp already set + params["pubkey_ed25519"] = ed25519PublicKey + params["signature"] = Base64.encodeBytes(signature) + return SnodeBatchRequestInfo( + Snode.Method.SendMessage.rawValue, + params, + namespace + ) + } + + /** + * Message hashes can be shared across multiple namespaces (for a single public key destination) + * @param publicKey the destination's identity public key to delete from (05...) + * @param messageHashes a list of stored message hashes to delete from the server + * @param required indicates that *at least one* message in the list is deleted from the server, otherwise it will return 404 + */ + fun buildAuthenticatedDeleteBatchInfo(publicKey: String, messageHashes: List, required: Boolean = false): SnodeBatchRequestInfo? { + val params = mutableMapOf( + "pubkey" to publicKey, + "required" to required, // could be omitted technically but explicit here + "messages" to messageHashes + ) + val userEd25519KeyPair = try { + MessagingModuleConfiguration.shared.getUserED25519KeyPair() ?: return null + } catch (e: Exception) { + return null + } + val ed25519PublicKey = userEd25519KeyPair.publicKey.asHexString + val signature = ByteArray(Sign.BYTES) + val verificationData = "delete${messageHashes.joinToString("")}".toByteArray() + try { + sodium.cryptoSignDetached( + signature, + verificationData, + verificationData.size.toLong(), + userEd25519KeyPair.secretKey.asBytes + ) + } catch (e: Exception) { + Log.e("Loki", "Signing data failed with user secret key", e) + return null + } + params["pubkey_ed25519"] = ed25519PublicKey + params["signature"] = Base64.encodeBytes(signature) + return SnodeBatchRequestInfo( + Snode.Method.Retrieve.rawValue, + params, + null + ) + } + fun buildAuthenticatedRetrieveBatchRequest(snode: Snode, publicKey: String, namespace: Int = 0): SnodeBatchRequestInfo? { val lastHashValue = database.getLastMessageHashValue(snode, publicKey, namespace) ?: "" val params = mutableMapOf( - "pubKey" to publicKey, + "pubkey" to publicKey, "last_hash" to lastHashValue, ) val userEd25519KeyPair = try { @@ -405,11 +488,11 @@ object SnodeAPI { ) } - fun getRawBatchResponse(snode: Snode, publicKey: String, requests: List): RawResponsePromise { + fun getRawBatchResponse(snode: Snode, publicKey: String, requests: List, sequence: Boolean = false): RawResponsePromise { val parameters = mutableMapOf( "requests" to requests ) - return invoke(Snode.Method.Batch, snode, parameters, publicKey) + return invoke(if (sequence) Snode.Method.Sequence else Snode.Method.Batch, snode, parameters, publicKey) } fun getExpiries(messageHashes: List, publicKey: String) : RawResponsePromise { diff --git a/libsession/src/main/java/org/session/libsession/utilities/ConfigFactoryProtocol.kt b/libsession/src/main/java/org/session/libsession/utilities/ConfigFactoryProtocol.kt index 6844aef98e..b05496c828 100644 --- a/libsession/src/main/java/org/session/libsession/utilities/ConfigFactoryProtocol.kt +++ b/libsession/src/main/java/org/session/libsession/utilities/ConfigFactoryProtocol.kt @@ -12,4 +12,6 @@ interface ConfigFactoryProtocol { fun persist(forConfigObject: ConfigBase) fun appendHash(configObject: ConfigBase, hash: String) fun notifyUpdates(forConfigObject: ConfigBase) + fun getHashesFor(forConfigObject: ConfigBase): List + fun removeHashesFor(config: ConfigBase, deletedHashes: Set): Boolean } \ No newline at end of file diff --git a/libsignal/src/main/java/org/session/libsignal/utilities/Snode.kt b/libsignal/src/main/java/org/session/libsignal/utilities/Snode.kt index 90b633e388..28f8aeb03b 100644 --- a/libsignal/src/main/java/org/session/libsignal/utilities/Snode.kt +++ b/libsignal/src/main/java/org/session/libsignal/utilities/Snode.kt @@ -12,6 +12,7 @@ class Snode(val address: String, val port: Int, val publicKeySet: KeySet?) { Info("info"), DeleteAll("delete_all"), Batch("batch"), + Sequence("sequence"), Expire("expire"), GetExpiries("get_expiries") }