From 51c30200495a157d229a686aa7aa49adc1a7a45e Mon Sep 17 00:00:00 2001 From: 0x330a <92654767+0x330a@users.noreply.github.com> Date: Thu, 14 Sep 2023 17:00:54 +1000 Subject: [PATCH] feat: group message from poller decoding / sending & receiving --- .../securesms/database/Storage.kt | 21 ++++++- .../securesms/dependencies/ConfigFactory.kt | 7 ++- .../securesms/dependencies/PollerFactory.kt | 4 +- .../dependencies/SessionUtilModule.kt | 5 +- libsession-util/src/main/cpp/group_keys.cpp | 39 ++++++++++++ .../loki/messenger/libsession_util/Config.kt | 6 ++ .../libsession/database/StorageProtocol.kt | 2 +- .../messaging/messages/Destination.kt | 3 + .../messages/visible/VisibleMessage.kt | 2 +- .../sending_receiving/MessageReceiver.kt | 63 +++++++++++-------- .../sending_receiving/MessageSender.kt | 23 ++++++- .../pollers/ClosedGroupPoller.kt | 38 +++++++---- .../org/session/libsession/snode/SnodeAPI.kt | 21 +++---- .../session/libsession/utilities/GroupUtil.kt | 6 +- 14 files changed, 177 insertions(+), 63 deletions(-) diff --git a/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt b/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt index b0d9c0235f..53eeddc8cd 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt @@ -84,6 +84,7 @@ import org.session.libsignal.utilities.KeyHelper import org.session.libsignal.utilities.Log import org.session.libsignal.utilities.SessionId import org.session.libsignal.utilities.guava.Optional +import org.session.libsignal.utilities.toHexString import org.thoughtcrime.securesms.database.helpers.SQLCipherOpenHelper import org.thoughtcrime.securesms.database.model.MessageId import org.thoughtcrime.securesms.database.model.ReactionRecord @@ -322,6 +323,9 @@ open class Storage( ?.let { SodiumUtilities.sessionId(getUserPublicKey()!!, message.sender!!, it) } ?: false val group: Optional = when { openGroupID != null -> Optional.of(SignalServiceGroup(openGroupID.toByteArray(), SignalServiceGroup.GroupType.PUBLIC_CHAT)) + groupPublicKey != null && groupPublicKey.startsWith(IdPrefix.GROUP.value) -> { + Optional.of(SignalServiceGroup(Hex.fromStringCondensed(groupPublicKey), SignalServiceGroup.GroupType.SIGNAL)) + } groupPublicKey != null -> { val doubleEncoded = GroupUtil.doubleEncodeGroupID(groupPublicKey) Optional.of(SignalServiceGroup(GroupUtil.getDecodedGroupIDAsData(doubleEncoded), SignalServiceGroup.GroupType.SIGNAL)) @@ -334,7 +338,14 @@ open class Storage( val targetAddress = if ((isUserSender || isUserBlindedSender) && !message.syncTarget.isNullOrEmpty()) { fromSerialized(message.syncTarget!!) } else if (group.isPresent) { - fromSerialized(GroupUtil.getEncodedId(group.get())) + val idHex = group.get().groupId.toHexString() + if (idHex.startsWith(IdPrefix.GROUP.value)) { + fromSerialized(idHex) + } else { + fromSerialized(GroupUtil.getEncodedId(group.get())) + } + } else if (message.recipient?.startsWith(IdPrefix.GROUP.value) == true) { + fromSerialized(message.recipient!!) } else { senderAddress } @@ -1121,7 +1132,7 @@ open class Storage( mmsDB.markAsSent(infoMessageID, true) } - override fun isClosedGroup(publicKey: String): Boolean { + override fun isLegacyClosedGroup(publicKey: String): Boolean { return DatabaseComponent.get(context).lokiAPIDatabase().isClosedGroup(publicKey) } @@ -1250,10 +1261,14 @@ open class Storage( return if (!openGroupID.isNullOrEmpty()) { val recipient = Recipient.from(context, fromSerialized(GroupUtil.getEncodedOpenGroupID(openGroupID.toByteArray())), false) database.getThreadIdIfExistsFor(recipient).let { if (it == -1L) null else it } - } else if (!groupPublicKey.isNullOrEmpty()) { + } else if (!groupPublicKey.isNullOrEmpty() && !groupPublicKey.startsWith(IdPrefix.GROUP.value)) { val recipient = Recipient.from(context, fromSerialized(GroupUtil.doubleEncodeGroupID(groupPublicKey)), false) if (createThread) database.getOrCreateThreadIdFor(recipient) else database.getThreadIdIfExistsFor(recipient).let { if (it == -1L) null else it } + } else if (!groupPublicKey.isNullOrEmpty()) { + val recipient = Recipient.from(context, fromSerialized(groupPublicKey), false) + if (createThread) database.getOrCreateThreadIdFor(recipient) + else database.getThreadIdIfExistsFor(recipient).let { if (it == -1L) null else it } } else { val recipient = Recipient.from(context, fromSerialized(publicKey), false) if (createThread) database.getOrCreateThreadIdFor(recipient) diff --git a/app/src/main/java/org/thoughtcrime/securesms/dependencies/ConfigFactory.kt b/app/src/main/java/org/thoughtcrime/securesms/dependencies/ConfigFactory.kt index f796c849e5..bdab987582 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/dependencies/ConfigFactory.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/dependencies/ConfigFactory.kt @@ -16,6 +16,7 @@ import org.session.libsession.utilities.ConfigFactoryUpdateListener import org.session.libsession.utilities.TextSecurePreferences import org.session.libsignal.protos.SignalServiceProtos.SharedConfigMessage import org.session.libsignal.utilities.Hex +import org.session.libsignal.utilities.IdPrefix import org.session.libsignal.utilities.Log import org.session.libsignal.utilities.SessionId import org.thoughtcrime.securesms.database.ConfigDatabase @@ -314,7 +315,11 @@ class ConfigFactory( val userGroups = userGroups ?: return false // Not handling the `hidden` behaviour for legacy groups so just indicate the existence - return (userGroups.getLegacyGroupInfo(groupPublicKey) != null) + return if (groupPublicKey.startsWith(IdPrefix.GROUP.value)) { + userGroups.getClosedGroup(groupPublicKey) != null + } else { + userGroups.getLegacyGroupInfo(groupPublicKey) != null + } } else if (publicKey == userPublicKey) { val user = user ?: return false diff --git a/app/src/main/java/org/thoughtcrime/securesms/dependencies/PollerFactory.kt b/app/src/main/java/org/thoughtcrime/securesms/dependencies/PollerFactory.kt index 5482743191..51b988ab11 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/dependencies/PollerFactory.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/dependencies/PollerFactory.kt @@ -1,6 +1,8 @@ package org.thoughtcrime.securesms.dependencies import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.plus import org.session.libsession.messaging.sending_receiving.pollers.ClosedGroupPoller import org.session.libsignal.utilities.SessionId import java.util.concurrent.ConcurrentHashMap @@ -13,7 +15,7 @@ class PollerFactory(private val scope: CoroutineScope, private val configFactory val activeGroup = configFactory.userGroups?.getClosedGroup(sessionId.hexString()) ?: return null // TODO: add check for active group being invited / approved etc return pollers.getOrPut(sessionId) { - ClosedGroupPoller(scope, sessionId, configFactory) + ClosedGroupPoller(scope + SupervisorJob(), sessionId, configFactory) } } diff --git a/app/src/main/java/org/thoughtcrime/securesms/dependencies/SessionUtilModule.kt b/app/src/main/java/org/thoughtcrime/securesms/dependencies/SessionUtilModule.kt index 44b4386028..1b411cec83 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/dependencies/SessionUtilModule.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/dependencies/SessionUtilModule.kt @@ -8,8 +8,6 @@ import dagger.hilt.android.qualifiers.ApplicationContext import dagger.hilt.components.SingletonComponent import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.GlobalScope -import kotlinx.coroutines.SupervisorJob -import kotlinx.coroutines.plus import org.session.libsession.utilities.ConfigFactoryUpdateListener import org.session.libsession.utilities.TextSecurePreferences import org.thoughtcrime.securesms.crypto.KeyPairUtilities @@ -42,8 +40,7 @@ object SessionUtilModule { @Provides @Named(POLLER_SCOPE) - fun providePollerScope(@ApplicationContext applicationContext: Context) = - GlobalScope + SupervisorJob() + fun providePollerScope(@ApplicationContext applicationContext: Context): CoroutineScope = GlobalScope @Provides @Singleton diff --git a/libsession-util/src/main/cpp/group_keys.cpp b/libsession-util/src/main/cpp/group_keys.cpp index cc94c5c901..9ad1644fd4 100644 --- a/libsession-util/src/main/cpp/group_keys.cpp +++ b/libsession-util/src/main/cpp/group_keys.cpp @@ -151,3 +151,42 @@ Java_network_loki_messenger_libsession_1util_GroupKeysConfig_free(JNIEnv *env, j auto ptr = ptrToKeys(env, thiz); delete ptr; } + +extern "C" +JNIEXPORT jbyteArray JNICALL +Java_network_loki_messenger_libsession_1util_GroupKeysConfig_encrypt(JNIEnv *env, jobject thiz, + jbyteArray plaintext) { + auto ptr = ptrToKeys(env, thiz); + auto plaintext_ustring = util::ustring_from_bytes(env, plaintext); + auto enc = ptr->encrypt_message(plaintext_ustring); + return util::bytes_from_ustring(env, enc); +} + +extern "C" +JNIEXPORT jbyteArray JNICALL +Java_network_loki_messenger_libsession_1util_GroupKeysConfig_decrypt(JNIEnv *env, jobject thiz, + jbyteArray ciphertext) { + auto ptr = ptrToKeys(env, thiz); + auto ciphertext_ustring = util::ustring_from_bytes(env, ciphertext); + auto plaintext = ptr->decrypt_message(ciphertext_ustring); + if (plaintext) { + return util::bytes_from_ustring(env, *plaintext); + } + + return nullptr; +} +extern "C" +JNIEXPORT jobject JNICALL +Java_network_loki_messenger_libsession_1util_GroupKeysConfig_keys(JNIEnv *env, jobject thiz) { + auto ptr = ptrToKeys(env, thiz); + auto keys = ptr->group_keys(); + jclass stack = env->FindClass("java/util/Stack"); + jmethodID init = env->GetMethodID(stack, "", "()V"); + jobject our_stack = env->NewObject(stack, init); + jmethodID push = env->GetMethodID(stack, "push", "(Ljava/lang/Object;)Ljava/lang/Object;"); + for (auto& key : keys) { + auto key_bytes = util::bytes_from_ustring(env, key); + env->CallObjectMethod(our_stack, push, key_bytes); + } + return our_stack; +} \ No newline at end of file diff --git a/libsession-util/src/main/java/network/loki/messenger/libsession_util/Config.kt b/libsession-util/src/main/java/network/loki/messenger/libsession_util/Config.kt index c74f954215..aba30c0869 100644 --- a/libsession-util/src/main/java/network/loki/messenger/libsession_util/Config.kt +++ b/libsession-util/src/main/java/network/loki/messenger/libsession_util/Config.kt @@ -304,4 +304,10 @@ class GroupKeysConfig(private val pointer: Long): Closeable { override fun close() { free() } + + external fun encrypt(plaintext: ByteArray): ByteArray + external fun decrypt(ciphertext: ByteArray): ByteArray? + + external fun keys(): Stack + } \ No newline at end of file diff --git a/libsession/src/main/java/org/session/libsession/database/StorageProtocol.kt b/libsession/src/main/java/org/session/libsession/database/StorageProtocol.kt index bc93703f32..1d5f99c6d6 100644 --- a/libsession/src/main/java/org/session/libsession/database/StorageProtocol.kt +++ b/libsession/src/main/java/org/session/libsession/database/StorageProtocol.kt @@ -148,7 +148,7 @@ interface StorageProtocol { name: String, members: Collection, admins: Collection, sentTimestamp: Long) fun insertOutgoingInfoMessage(context: Context, groupID: String, type: SignalServiceGroup.Type, name: String, members: Collection, admins: Collection, threadID: Long, sentTimestamp: Long) - fun isClosedGroup(publicKey: String): Boolean + fun isLegacyClosedGroup(publicKey: String): Boolean fun getClosedGroupEncryptionKeyPairs(groupPublicKey: String): MutableList fun getLatestClosedGroupEncryptionKeyPair(groupPublicKey: String): ECKeyPair? fun updateFormationTimestamp(groupID: String, formationTimestamp: Long) diff --git a/libsession/src/main/java/org/session/libsession/messaging/messages/Destination.kt b/libsession/src/main/java/org/session/libsession/messaging/messages/Destination.kt index 25fa6284bc..16ddc07657 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/messages/Destination.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/messages/Destination.kt @@ -61,6 +61,9 @@ sealed class Destination { groupInboxId.last() ) } + address.isClosedGroup -> { + ClosedGroup(address.serialize()) + } else -> { throw Exception("TODO: Handle legacy closed groups.") } diff --git a/libsession/src/main/java/org/session/libsession/messaging/messages/visible/VisibleMessage.kt b/libsession/src/main/java/org/session/libsession/messaging/messages/visible/VisibleMessage.kt index 63c84c915e..d6893f0d51 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/messages/visible/VisibleMessage.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/messages/visible/VisibleMessage.kt @@ -130,7 +130,7 @@ class VisibleMessage( // if it receives a message without the current expiration timer value attached to it... val storage = MessagingModuleConfiguration.shared.storage val context = MessagingModuleConfiguration.shared.context - val expiration = if (storage.isClosedGroup(recipient!!)) { + val expiration = if (storage.isLegacyClosedGroup(recipient!!)) { Recipient.from(context, Address.fromSerialized(GroupUtil.doubleEncodeGroupID(recipient!!)), false).expireMessages } else { Recipient.from(context, Address.fromSerialized(recipient!!), false).expireMessages diff --git a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/MessageReceiver.kt b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/MessageReceiver.kt index dd592c9ae7..216170b1ca 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/MessageReceiver.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/MessageReceiver.kt @@ -93,33 +93,46 @@ object MessageReceiver { } SignalServiceProtos.Envelope.Type.CLOSED_GROUP_MESSAGE -> { val hexEncodedGroupPublicKey = envelope.source - if (hexEncodedGroupPublicKey == null || !MessagingModuleConfiguration.shared.storage.isClosedGroup(hexEncodedGroupPublicKey)) { - throw Error.InvalidGroupPublicKey - } - val encryptionKeyPairs = MessagingModuleConfiguration.shared.storage.getClosedGroupEncryptionKeyPairs(hexEncodedGroupPublicKey) - if (encryptionKeyPairs.isEmpty()) { - throw Error.NoGroupKeyPair - } - // Loop through all known group key pairs in reverse order (i.e. try the latest key pair first (which'll more than - // likely be the one we want) but try older ones in case that didn't work) - var encryptionKeyPair = encryptionKeyPairs.removeLast() - fun decrypt() { - try { - val decryptionResult = MessageDecrypter.decrypt(ciphertext.toByteArray(), encryptionKeyPair) - plaintext = decryptionResult.first - sender = decryptionResult.second - } catch (e: Exception) { - if (encryptionKeyPairs.isNotEmpty()) { - encryptionKeyPair = encryptionKeyPairs.removeLast() - decrypt() - } else { - Log.e("Loki", "Failed to decrypt group message", e) - throw e + val sessionId = SessionId.from(hexEncodedGroupPublicKey) + if (sessionId.prefix == IdPrefix.GROUP) { + val configFactory = MessagingModuleConfiguration.shared.configFactory + configFactory.getGroupKeysConfig(sessionId)?.use { config -> + plaintext = config.decrypt(ciphertext.toByteArray()) + sender = userPublicKey + groupPublicKey = envelope.source + } + if (plaintext == null) { + throw Error.DecryptionFailed + } + } else { + if (!MessagingModuleConfiguration.shared.storage.isLegacyClosedGroup(hexEncodedGroupPublicKey)) { + throw Error.InvalidGroupPublicKey + } + val encryptionKeyPairs = MessagingModuleConfiguration.shared.storage.getClosedGroupEncryptionKeyPairs(hexEncodedGroupPublicKey) + if (encryptionKeyPairs.isEmpty()) { + throw Error.NoGroupKeyPair + } + // Loop through all known group key pairs in reverse order (i.e. try the latest key pair first (which'll more than + // likely be the one we want) but try older ones in case that didn't work) + var encryptionKeyPair = encryptionKeyPairs.removeLast() + fun decrypt() { + try { + val decryptionResult = MessageDecrypter.decrypt(ciphertext.toByteArray(), encryptionKeyPair) + plaintext = decryptionResult.first + sender = decryptionResult.second + } catch (e: Exception) { + if (encryptionKeyPairs.isNotEmpty()) { + encryptionKeyPair = encryptionKeyPairs.removeLast() + decrypt() + } else { + Log.e("Loki", "Failed to decrypt group message", e) + throw e + } } } + groupPublicKey = envelope.source + decrypt() } - groupPublicKey = envelope.source - decrypt() } else -> { throw Error.UnknownEnvelopeType @@ -174,7 +187,7 @@ object MessageReceiver { // If the message failed to process the first time around we retry it later (if the error is retryable). In this case the timestamp // will already be in the database but we don't want to treat the message as a duplicate. The isRetry flag is a simple workaround // for this issue. - if (groupPublicKey != null && groupPublicKey !in (currentClosedGroups ?: emptySet())) { + if (groupPublicKey != null && groupPublicKey !in (currentClosedGroups ?: emptySet()) && groupPublicKey?.startsWith(IdPrefix.GROUP.value) != true) { throw Error.NoGroupThread } if ((message is ClosedGroupControlMessage && message.kind is ClosedGroupControlMessage.Kind.New) || message is SharedConfigurationMessage) { diff --git a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/MessageSender.kt b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/MessageSender.kt index 839d620109..716c57a2fe 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/MessageSender.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/MessageSender.kt @@ -75,6 +75,7 @@ object MessageSender { @Throws(Exception::class) fun buildWrappedMessageToSnode(destination: Destination, message: Message, isSyncMessage: Boolean): SnodeMessage { val storage = MessagingModuleConfiguration.shared.storage + val configFactory = MessagingModuleConfiguration.shared.configFactory val userPublicKey = storage.getUserPublicKey() // Set the timestamp, sender and recipient val messageSendTime = SnodeAPI.nowWithOffset @@ -88,6 +89,7 @@ object MessageSender { when (destination) { is Destination.Contact -> message.recipient = destination.publicKey is Destination.LegacyClosedGroup -> message.recipient = destination.groupPublicKey + is Destination.ClosedGroup -> message.recipient = destination.publicKey else -> throw IllegalStateException("Destination should not be an open group.") } @@ -133,6 +135,12 @@ object MessageSender { )!! MessageEncrypter.encrypt(plaintext, encryptionKeyPair.hexEncodedPublicKey) } + is Destination.ClosedGroup -> { + val groupKeys = configFactory.getGroupKeysConfig(SessionId.from(destination.publicKey)) ?: throw Error.NoKeyPair + groupKeys.use { keys -> + keys.encrypt(plaintext) + } + } else -> throw IllegalStateException("Destination should not be open group.") } // Wrap the result @@ -147,6 +155,10 @@ object MessageSender { kind = SignalServiceProtos.Envelope.Type.CLOSED_GROUP_MESSAGE senderPublicKey = destination.groupPublicKey } + is Destination.ClosedGroup -> { + kind = SignalServiceProtos.Envelope.Type.CLOSED_GROUP_MESSAGE + senderPublicKey = destination.publicKey + } else -> throw IllegalStateException("Destination should not be open group.") } val wrappedMessage = MessageWrapper.wrap(kind, message.sentTimestamp!!, senderPublicKey, ciphertext) @@ -165,6 +177,7 @@ object MessageSender { val deferred = deferred() val promise = deferred.promise val storage = MessagingModuleConfiguration.shared.storage + val configFactory = MessagingModuleConfiguration.shared.configFactory val userPublicKey = storage.getUserPublicKey() // recipient will be set later, so initialize it as a function here @@ -189,7 +202,15 @@ object MessageSender { && forkInfo.hasNamespaces() -> listOf(Namespace.UNAUTHENTICATED_CLOSED_GROUP, Namespace.DEFAULT) else -> listOf(Namespace.DEFAULT) } - namespaces.map { namespace -> SnodeAPI.sendMessage(snodeMessage, requiresAuth = false, namespace = namespace) }.let { promises -> + namespaces.map { namespace -> + if (destination is Destination.ClosedGroup) { + // possibly handle a failure for no user groups or no closed group signing key? + val signingKey = configFactory.userGroups!!.getClosedGroup(destination.publicKey)!!.signingKey() + SnodeAPI.sendAuthenticatedMessage(snodeMessage, signingKey, namespace = namespace) + } else { + SnodeAPI.sendMessage(snodeMessage, requiresAuth = false, namespace = namespace) + } + }.let { promises -> var isSuccess = false val promiseCount = promises.size val errorCount = AtomicInteger(0) diff --git a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/ClosedGroupPoller.kt b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/ClosedGroupPoller.kt index 70d24e5504..8520010166 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/ClosedGroupPoller.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/ClosedGroupPoller.kt @@ -10,6 +10,9 @@ import network.loki.messenger.libsession_util.GroupInfoConfig import network.loki.messenger.libsession_util.GroupKeysConfig import network.loki.messenger.libsession_util.GroupMembersConfig import network.loki.messenger.libsession_util.util.GroupInfo +import org.session.libsession.messaging.jobs.BatchMessageReceiveJob +import org.session.libsession.messaging.jobs.JobQueue +import org.session.libsession.messaging.jobs.MessageReceiveParameters import org.session.libsession.snode.RawResponse import org.session.libsession.snode.SnodeAPI import org.session.libsession.utilities.ConfigFactoryProtocol @@ -17,6 +20,7 @@ import org.session.libsignal.utilities.Base64 import org.session.libsignal.utilities.Log import org.session.libsignal.utilities.Namespace import org.session.libsignal.utilities.SessionId +import org.session.libsignal.utilities.Snode class ClosedGroupPoller(private val executor: CoroutineScope, private val closedGroupSessionId: SessionId, @@ -50,6 +54,7 @@ class ClosedGroupPoller(private val executor: CoroutineScope, companion object { const val POLL_INTERVAL = 3_000L + const val ENABLE_LOGGING = false } private var isRunning: Boolean = false @@ -58,7 +63,7 @@ class ClosedGroupPoller(private val executor: CoroutineScope, fun start() { if (isRunning) return // already started, don't restart - Log.d("ClosedGroupPoller", "Starting closed group poller for ${closedGroupSessionId.hexString().take(4)}") + if (ENABLE_LOGGING) Log.d("ClosedGroupPoller", "Starting closed group poller for ${closedGroupSessionId.hexString().take(4)}") job?.cancel() job = executor.launch(Dispatchers.IO) { val closedGroups = configFactoryProtocol.userGroups?: return@launch @@ -69,7 +74,7 @@ class ClosedGroupPoller(private val executor: CoroutineScope, if (nextPoll != null) { delay(nextPoll) } else { - Log.d("ClosedGroupPoller", "Stopping the closed group poller") + if (ENABLE_LOGGING) Log.d("ClosedGroupPoller", "Stopping the closed group poller") return@launch } } @@ -133,13 +138,13 @@ class ClosedGroupPoller(private val executor: CoroutineScope, // TODO: add the extend duration TTLs for known hashes here // if poll result body is null here we don't have any things ig - Log.d("ClosedGroupPoller", "Poll results @${SnodeAPI.nowWithOffset}:") + if (ENABLE_LOGGING) Log.d("ClosedGroupPoller", "Poll results @${SnodeAPI.nowWithOffset}:") (pollResult["results"] as List).forEachIndexed { index, response -> when (index) { keysIndex -> handleKeyPoll(response, keys, info, members) infoIndex -> handleInfo(response, info) membersIndex -> handleMembers(response, members) - messageIndex -> handleMessages(response, keys) + messageIndex -> handleMessages(response, snode) } } @@ -149,7 +154,7 @@ class ClosedGroupPoller(private val executor: CoroutineScope, members.free() } catch (e: Exception) { - Log.e("GroupPoller", "Polling failed for group", e) + if (ENABLE_LOGGING) Log.e("GroupPoller", "Polling failed for group", e) return POLL_INTERVAL } return POLL_INTERVAL // this might change in future @@ -158,7 +163,7 @@ class ClosedGroupPoller(private val executor: CoroutineScope, private fun parseMessages(response: RawResponse): List { val body = response["body"] as? RawResponse if (body == null) { - Log.e("GroupPoller", "Batch parse messages contained no body!") + if (ENABLE_LOGGING) Log.e("GroupPoller", "Batch parse messages contained no body!") return emptyList() } val messages = body["messages"] as? List<*> ?: return emptyList() @@ -179,7 +184,7 @@ class ClosedGroupPoller(private val executor: CoroutineScope, // get all the data to hash objects and process them parseMessages(response).forEach { (message, hash, timestamp) -> keysConfig.loadKey(message, hash, timestamp, infoConfig, membersConfig) - Log.d("ClosedGroupPoller", "Merged $hash for keys on ${closedGroupSessionId.hexString()}") + if (ENABLE_LOGGING) Log.d("ClosedGroupPoller", "Merged $hash for keys on ${closedGroupSessionId.hexString()}") } } @@ -187,7 +192,7 @@ class ClosedGroupPoller(private val executor: CoroutineScope, infoConfig: GroupInfoConfig) { parseMessages(response).forEach { (message, hash, _) -> infoConfig.merge(hash to message) - Log.d("ClosedGroupPoller", "Merged $hash for info on ${closedGroupSessionId.hexString()}") + if (ENABLE_LOGGING) Log.d("ClosedGroupPoller", "Merged $hash for info on ${closedGroupSessionId.hexString()}") } } @@ -195,15 +200,22 @@ class ClosedGroupPoller(private val executor: CoroutineScope, membersConfig: GroupMembersConfig) { parseMessages(response).forEach { (message, hash, _) -> membersConfig.merge(hash to message) - Log.d("ClosedGroupPoller", "Merged $hash for members on ${closedGroupSessionId.hexString()}") + if (ENABLE_LOGGING) Log.d("ClosedGroupPoller", "Merged $hash for members on ${closedGroupSessionId.hexString()}") } } - private fun handleMessages(response: RawResponse, keysConfig: GroupKeysConfig) { - val messages = parseMessages(response) - if (messages.isNotEmpty()) { - // TODO: process decrypting bundles + private fun handleMessages(response: RawResponse, snode: Snode) { + val body = response["body"] as RawResponse + val messages = SnodeAPI.parseRawMessagesResponse(body, snode, closedGroupSessionId.hexString()) + val parameters = messages.map { (envelope, serverHash) -> + MessageReceiveParameters(envelope.toByteArray(), serverHash = serverHash) } + parameters.chunked(BatchMessageReceiveJob.BATCH_DEFAULT_NUMBER).forEach { chunk -> + val job = BatchMessageReceiveJob(chunk) + JobQueue.shared.add(job) + } + if (ENABLE_LOGGING) Log.d("ClosedGroupPoller", "namespace 0 message size: ${messages.size}") + } } \ No newline at end of file diff --git a/libsession/src/main/java/org/session/libsession/snode/SnodeAPI.kt b/libsession/src/main/java/org/session/libsession/snode/SnodeAPI.kt index b2bf7583b6..7ef0a29ddb 100644 --- a/libsession/src/main/java/org/session/libsession/snode/SnodeAPI.kt +++ b/libsession/src/main/java/org/session/libsession/snode/SnodeAPI.kt @@ -3,7 +3,6 @@ package org.session.libsession.snode import android.os.Build -import androidx.annotation.WorkerThread import com.goterl.lazysodium.LazySodiumAndroid import com.goterl.lazysodium.SodiumAndroid import com.goterl.lazysodium.exceptions.SodiumException @@ -655,34 +654,32 @@ object SnodeAPI { } } - @WorkerThread - fun sendAuthenticatedMessage(message: SnodeMessage, signingKey: ByteArray, namespace: Int): RawResponse { + fun sendAuthenticatedMessage(message: SnodeMessage, signingKey: ByteArray, namespace: Int): RawResponsePromise { val pubKey = message.recipient return retryIfNeeded(maxRetryCount) { - val timestamp = nowWithOffset val signature = ByteArray(Sign.BYTES) // assume namespace here is non-zero, as zero namespace doesn't require auth - val verificationData = "store$namespace$timestamp".toByteArray() + val sigTimestamp = nowWithOffset + val verificationData = "store$namespace$sigTimestamp".toByteArray() try { sodium.cryptoSignDetached(signature, verificationData, verificationData.size.toLong(), signingKey) } catch (exception: Exception) { return@retryIfNeeded Promise.ofFail(Error.SigningFailed) } - val parameters = mapOf( - "pubKey" to pubKey, - "data" to message.data, - "timestamp" to timestamp.toString(), - "sig_timestamp" to timestamp.toString(), - "signature" to Base64.encodeBytes(verificationData) + val parameters = message.toJSON().toMutableMap() + + parameters += mapOf( + "sig_timestamp" to sigTimestamp, + "signature" to Base64.encodeBytes(signature) ) getSingleTargetSnode(pubKey).bind { targetSnode -> invoke(Snode.Method.SendMessage, targetSnode, parameters, pubKey) } - }.get() + } } fun sendMessage(message: SnodeMessage, requiresAuth: Boolean = false, namespace: Int = 0): RawResponsePromise { diff --git a/libsession/src/main/java/org/session/libsession/utilities/GroupUtil.kt b/libsession/src/main/java/org/session/libsession/utilities/GroupUtil.kt index 7e43f11ff4..07ab1605b4 100644 --- a/libsession/src/main/java/org/session/libsession/utilities/GroupUtil.kt +++ b/libsession/src/main/java/org/session/libsession/utilities/GroupUtil.kt @@ -3,6 +3,7 @@ package org.session.libsession.utilities import org.session.libsession.messaging.open_groups.OpenGroup import org.session.libsignal.messages.SignalServiceGroup import org.session.libsignal.utilities.Hex +import org.session.libsignal.utilities.IdPrefix import org.session.libsignal.utilities.SessionId import java.io.IOException @@ -30,7 +31,9 @@ object GroupUtil { @JvmStatic fun getEncodedClosedGroupID(groupID: ByteArray): String { - return LEGACY_CLOSED_GROUP_PREFIX + Hex.toStringCondensed(groupID) + val hex = Hex.toStringCondensed(groupID) + if (hex.startsWith(IdPrefix.GROUP.value)) throw IllegalArgumentException("Trying to encode a new closed group") + return LEGACY_CLOSED_GROUP_PREFIX + hex } @JvmStatic @@ -92,6 +95,7 @@ object GroupUtil { @JvmStatic @Throws(IOException::class) fun doubleEncodeGroupID(groupPublicKey: String): String { + if (groupPublicKey.startsWith(IdPrefix.GROUP.value)) throw IllegalArgumentException("Trying to double encode a new closed group") return getEncodedClosedGroupID(getEncodedClosedGroupID(Hex.fromStringCondensed(groupPublicKey)).toByteArray()) }