mirror of
https://github.com/oxen-io/session-android.git
synced 2025-08-24 00:07:47 +00:00
feat: group message from poller decoding / sending & receiving
This commit is contained in:
@@ -148,7 +148,7 @@ interface StorageProtocol {
|
||||
name: String, members: Collection<String>, admins: Collection<String>, sentTimestamp: Long)
|
||||
fun insertOutgoingInfoMessage(context: Context, groupID: String, type: SignalServiceGroup.Type, name: String,
|
||||
members: Collection<String>, admins: Collection<String>, threadID: Long, sentTimestamp: Long)
|
||||
fun isClosedGroup(publicKey: String): Boolean
|
||||
fun isLegacyClosedGroup(publicKey: String): Boolean
|
||||
fun getClosedGroupEncryptionKeyPairs(groupPublicKey: String): MutableList<ECKeyPair>
|
||||
fun getLatestClosedGroupEncryptionKeyPair(groupPublicKey: String): ECKeyPair?
|
||||
fun updateFormationTimestamp(groupID: String, formationTimestamp: Long)
|
||||
|
@@ -61,6 +61,9 @@ sealed class Destination {
|
||||
groupInboxId.last()
|
||||
)
|
||||
}
|
||||
address.isClosedGroup -> {
|
||||
ClosedGroup(address.serialize())
|
||||
}
|
||||
else -> {
|
||||
throw Exception("TODO: Handle legacy closed groups.")
|
||||
}
|
||||
|
@@ -130,7 +130,7 @@ class VisibleMessage(
|
||||
// if it receives a message without the current expiration timer value attached to it...
|
||||
val storage = MessagingModuleConfiguration.shared.storage
|
||||
val context = MessagingModuleConfiguration.shared.context
|
||||
val expiration = if (storage.isClosedGroup(recipient!!)) {
|
||||
val expiration = if (storage.isLegacyClosedGroup(recipient!!)) {
|
||||
Recipient.from(context, Address.fromSerialized(GroupUtil.doubleEncodeGroupID(recipient!!)), false).expireMessages
|
||||
} else {
|
||||
Recipient.from(context, Address.fromSerialized(recipient!!), false).expireMessages
|
||||
|
@@ -93,33 +93,46 @@ object MessageReceiver {
|
||||
}
|
||||
SignalServiceProtos.Envelope.Type.CLOSED_GROUP_MESSAGE -> {
|
||||
val hexEncodedGroupPublicKey = envelope.source
|
||||
if (hexEncodedGroupPublicKey == null || !MessagingModuleConfiguration.shared.storage.isClosedGroup(hexEncodedGroupPublicKey)) {
|
||||
throw Error.InvalidGroupPublicKey
|
||||
}
|
||||
val encryptionKeyPairs = MessagingModuleConfiguration.shared.storage.getClosedGroupEncryptionKeyPairs(hexEncodedGroupPublicKey)
|
||||
if (encryptionKeyPairs.isEmpty()) {
|
||||
throw Error.NoGroupKeyPair
|
||||
}
|
||||
// Loop through all known group key pairs in reverse order (i.e. try the latest key pair first (which'll more than
|
||||
// likely be the one we want) but try older ones in case that didn't work)
|
||||
var encryptionKeyPair = encryptionKeyPairs.removeLast()
|
||||
fun decrypt() {
|
||||
try {
|
||||
val decryptionResult = MessageDecrypter.decrypt(ciphertext.toByteArray(), encryptionKeyPair)
|
||||
plaintext = decryptionResult.first
|
||||
sender = decryptionResult.second
|
||||
} catch (e: Exception) {
|
||||
if (encryptionKeyPairs.isNotEmpty()) {
|
||||
encryptionKeyPair = encryptionKeyPairs.removeLast()
|
||||
decrypt()
|
||||
} else {
|
||||
Log.e("Loki", "Failed to decrypt group message", e)
|
||||
throw e
|
||||
val sessionId = SessionId.from(hexEncodedGroupPublicKey)
|
||||
if (sessionId.prefix == IdPrefix.GROUP) {
|
||||
val configFactory = MessagingModuleConfiguration.shared.configFactory
|
||||
configFactory.getGroupKeysConfig(sessionId)?.use { config ->
|
||||
plaintext = config.decrypt(ciphertext.toByteArray())
|
||||
sender = userPublicKey
|
||||
groupPublicKey = envelope.source
|
||||
}
|
||||
if (plaintext == null) {
|
||||
throw Error.DecryptionFailed
|
||||
}
|
||||
} else {
|
||||
if (!MessagingModuleConfiguration.shared.storage.isLegacyClosedGroup(hexEncodedGroupPublicKey)) {
|
||||
throw Error.InvalidGroupPublicKey
|
||||
}
|
||||
val encryptionKeyPairs = MessagingModuleConfiguration.shared.storage.getClosedGroupEncryptionKeyPairs(hexEncodedGroupPublicKey)
|
||||
if (encryptionKeyPairs.isEmpty()) {
|
||||
throw Error.NoGroupKeyPair
|
||||
}
|
||||
// Loop through all known group key pairs in reverse order (i.e. try the latest key pair first (which'll more than
|
||||
// likely be the one we want) but try older ones in case that didn't work)
|
||||
var encryptionKeyPair = encryptionKeyPairs.removeLast()
|
||||
fun decrypt() {
|
||||
try {
|
||||
val decryptionResult = MessageDecrypter.decrypt(ciphertext.toByteArray(), encryptionKeyPair)
|
||||
plaintext = decryptionResult.first
|
||||
sender = decryptionResult.second
|
||||
} catch (e: Exception) {
|
||||
if (encryptionKeyPairs.isNotEmpty()) {
|
||||
encryptionKeyPair = encryptionKeyPairs.removeLast()
|
||||
decrypt()
|
||||
} else {
|
||||
Log.e("Loki", "Failed to decrypt group message", e)
|
||||
throw e
|
||||
}
|
||||
}
|
||||
}
|
||||
groupPublicKey = envelope.source
|
||||
decrypt()
|
||||
}
|
||||
groupPublicKey = envelope.source
|
||||
decrypt()
|
||||
}
|
||||
else -> {
|
||||
throw Error.UnknownEnvelopeType
|
||||
@@ -174,7 +187,7 @@ object MessageReceiver {
|
||||
// If the message failed to process the first time around we retry it later (if the error is retryable). In this case the timestamp
|
||||
// will already be in the database but we don't want to treat the message as a duplicate. The isRetry flag is a simple workaround
|
||||
// for this issue.
|
||||
if (groupPublicKey != null && groupPublicKey !in (currentClosedGroups ?: emptySet())) {
|
||||
if (groupPublicKey != null && groupPublicKey !in (currentClosedGroups ?: emptySet()) && groupPublicKey?.startsWith(IdPrefix.GROUP.value) != true) {
|
||||
throw Error.NoGroupThread
|
||||
}
|
||||
if ((message is ClosedGroupControlMessage && message.kind is ClosedGroupControlMessage.Kind.New) || message is SharedConfigurationMessage) {
|
||||
|
@@ -75,6 +75,7 @@ object MessageSender {
|
||||
@Throws(Exception::class)
|
||||
fun buildWrappedMessageToSnode(destination: Destination, message: Message, isSyncMessage: Boolean): SnodeMessage {
|
||||
val storage = MessagingModuleConfiguration.shared.storage
|
||||
val configFactory = MessagingModuleConfiguration.shared.configFactory
|
||||
val userPublicKey = storage.getUserPublicKey()
|
||||
// Set the timestamp, sender and recipient
|
||||
val messageSendTime = SnodeAPI.nowWithOffset
|
||||
@@ -88,6 +89,7 @@ object MessageSender {
|
||||
when (destination) {
|
||||
is Destination.Contact -> message.recipient = destination.publicKey
|
||||
is Destination.LegacyClosedGroup -> message.recipient = destination.groupPublicKey
|
||||
is Destination.ClosedGroup -> message.recipient = destination.publicKey
|
||||
else -> throw IllegalStateException("Destination should not be an open group.")
|
||||
}
|
||||
|
||||
@@ -133,6 +135,12 @@ object MessageSender {
|
||||
)!!
|
||||
MessageEncrypter.encrypt(plaintext, encryptionKeyPair.hexEncodedPublicKey)
|
||||
}
|
||||
is Destination.ClosedGroup -> {
|
||||
val groupKeys = configFactory.getGroupKeysConfig(SessionId.from(destination.publicKey)) ?: throw Error.NoKeyPair
|
||||
groupKeys.use { keys ->
|
||||
keys.encrypt(plaintext)
|
||||
}
|
||||
}
|
||||
else -> throw IllegalStateException("Destination should not be open group.")
|
||||
}
|
||||
// Wrap the result
|
||||
@@ -147,6 +155,10 @@ object MessageSender {
|
||||
kind = SignalServiceProtos.Envelope.Type.CLOSED_GROUP_MESSAGE
|
||||
senderPublicKey = destination.groupPublicKey
|
||||
}
|
||||
is Destination.ClosedGroup -> {
|
||||
kind = SignalServiceProtos.Envelope.Type.CLOSED_GROUP_MESSAGE
|
||||
senderPublicKey = destination.publicKey
|
||||
}
|
||||
else -> throw IllegalStateException("Destination should not be open group.")
|
||||
}
|
||||
val wrappedMessage = MessageWrapper.wrap(kind, message.sentTimestamp!!, senderPublicKey, ciphertext)
|
||||
@@ -165,6 +177,7 @@ object MessageSender {
|
||||
val deferred = deferred<Unit, Exception>()
|
||||
val promise = deferred.promise
|
||||
val storage = MessagingModuleConfiguration.shared.storage
|
||||
val configFactory = MessagingModuleConfiguration.shared.configFactory
|
||||
val userPublicKey = storage.getUserPublicKey()
|
||||
|
||||
// recipient will be set later, so initialize it as a function here
|
||||
@@ -189,7 +202,15 @@ object MessageSender {
|
||||
&& forkInfo.hasNamespaces() -> listOf(Namespace.UNAUTHENTICATED_CLOSED_GROUP, Namespace.DEFAULT)
|
||||
else -> listOf(Namespace.DEFAULT)
|
||||
}
|
||||
namespaces.map { namespace -> SnodeAPI.sendMessage(snodeMessage, requiresAuth = false, namespace = namespace) }.let { promises ->
|
||||
namespaces.map { namespace ->
|
||||
if (destination is Destination.ClosedGroup) {
|
||||
// possibly handle a failure for no user groups or no closed group signing key?
|
||||
val signingKey = configFactory.userGroups!!.getClosedGroup(destination.publicKey)!!.signingKey()
|
||||
SnodeAPI.sendAuthenticatedMessage(snodeMessage, signingKey, namespace = namespace)
|
||||
} else {
|
||||
SnodeAPI.sendMessage(snodeMessage, requiresAuth = false, namespace = namespace)
|
||||
}
|
||||
}.let { promises ->
|
||||
var isSuccess = false
|
||||
val promiseCount = promises.size
|
||||
val errorCount = AtomicInteger(0)
|
||||
|
@@ -10,6 +10,9 @@ import network.loki.messenger.libsession_util.GroupInfoConfig
|
||||
import network.loki.messenger.libsession_util.GroupKeysConfig
|
||||
import network.loki.messenger.libsession_util.GroupMembersConfig
|
||||
import network.loki.messenger.libsession_util.util.GroupInfo
|
||||
import org.session.libsession.messaging.jobs.BatchMessageReceiveJob
|
||||
import org.session.libsession.messaging.jobs.JobQueue
|
||||
import org.session.libsession.messaging.jobs.MessageReceiveParameters
|
||||
import org.session.libsession.snode.RawResponse
|
||||
import org.session.libsession.snode.SnodeAPI
|
||||
import org.session.libsession.utilities.ConfigFactoryProtocol
|
||||
@@ -17,6 +20,7 @@ import org.session.libsignal.utilities.Base64
|
||||
import org.session.libsignal.utilities.Log
|
||||
import org.session.libsignal.utilities.Namespace
|
||||
import org.session.libsignal.utilities.SessionId
|
||||
import org.session.libsignal.utilities.Snode
|
||||
|
||||
class ClosedGroupPoller(private val executor: CoroutineScope,
|
||||
private val closedGroupSessionId: SessionId,
|
||||
@@ -50,6 +54,7 @@ class ClosedGroupPoller(private val executor: CoroutineScope,
|
||||
|
||||
companion object {
|
||||
const val POLL_INTERVAL = 3_000L
|
||||
const val ENABLE_LOGGING = false
|
||||
}
|
||||
|
||||
private var isRunning: Boolean = false
|
||||
@@ -58,7 +63,7 @@ class ClosedGroupPoller(private val executor: CoroutineScope,
|
||||
fun start() {
|
||||
if (isRunning) return // already started, don't restart
|
||||
|
||||
Log.d("ClosedGroupPoller", "Starting closed group poller for ${closedGroupSessionId.hexString().take(4)}")
|
||||
if (ENABLE_LOGGING) Log.d("ClosedGroupPoller", "Starting closed group poller for ${closedGroupSessionId.hexString().take(4)}")
|
||||
job?.cancel()
|
||||
job = executor.launch(Dispatchers.IO) {
|
||||
val closedGroups = configFactoryProtocol.userGroups?: return@launch
|
||||
@@ -69,7 +74,7 @@ class ClosedGroupPoller(private val executor: CoroutineScope,
|
||||
if (nextPoll != null) {
|
||||
delay(nextPoll)
|
||||
} else {
|
||||
Log.d("ClosedGroupPoller", "Stopping the closed group poller")
|
||||
if (ENABLE_LOGGING) Log.d("ClosedGroupPoller", "Stopping the closed group poller")
|
||||
return@launch
|
||||
}
|
||||
}
|
||||
@@ -133,13 +138,13 @@ class ClosedGroupPoller(private val executor: CoroutineScope,
|
||||
// TODO: add the extend duration TTLs for known hashes here
|
||||
|
||||
// if poll result body is null here we don't have any things ig
|
||||
Log.d("ClosedGroupPoller", "Poll results @${SnodeAPI.nowWithOffset}:")
|
||||
if (ENABLE_LOGGING) Log.d("ClosedGroupPoller", "Poll results @${SnodeAPI.nowWithOffset}:")
|
||||
(pollResult["results"] as List<RawResponse>).forEachIndexed { index, response ->
|
||||
when (index) {
|
||||
keysIndex -> handleKeyPoll(response, keys, info, members)
|
||||
infoIndex -> handleInfo(response, info)
|
||||
membersIndex -> handleMembers(response, members)
|
||||
messageIndex -> handleMessages(response, keys)
|
||||
messageIndex -> handleMessages(response, snode)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -149,7 +154,7 @@ class ClosedGroupPoller(private val executor: CoroutineScope,
|
||||
members.free()
|
||||
|
||||
} catch (e: Exception) {
|
||||
Log.e("GroupPoller", "Polling failed for group", e)
|
||||
if (ENABLE_LOGGING) Log.e("GroupPoller", "Polling failed for group", e)
|
||||
return POLL_INTERVAL
|
||||
}
|
||||
return POLL_INTERVAL // this might change in future
|
||||
@@ -158,7 +163,7 @@ class ClosedGroupPoller(private val executor: CoroutineScope,
|
||||
private fun parseMessages(response: RawResponse): List<ParsedRawMessage> {
|
||||
val body = response["body"] as? RawResponse
|
||||
if (body == null) {
|
||||
Log.e("GroupPoller", "Batch parse messages contained no body!")
|
||||
if (ENABLE_LOGGING) Log.e("GroupPoller", "Batch parse messages contained no body!")
|
||||
return emptyList()
|
||||
}
|
||||
val messages = body["messages"] as? List<*> ?: return emptyList()
|
||||
@@ -179,7 +184,7 @@ class ClosedGroupPoller(private val executor: CoroutineScope,
|
||||
// get all the data to hash objects and process them
|
||||
parseMessages(response).forEach { (message, hash, timestamp) ->
|
||||
keysConfig.loadKey(message, hash, timestamp, infoConfig, membersConfig)
|
||||
Log.d("ClosedGroupPoller", "Merged $hash for keys on ${closedGroupSessionId.hexString()}")
|
||||
if (ENABLE_LOGGING) Log.d("ClosedGroupPoller", "Merged $hash for keys on ${closedGroupSessionId.hexString()}")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -187,7 +192,7 @@ class ClosedGroupPoller(private val executor: CoroutineScope,
|
||||
infoConfig: GroupInfoConfig) {
|
||||
parseMessages(response).forEach { (message, hash, _) ->
|
||||
infoConfig.merge(hash to message)
|
||||
Log.d("ClosedGroupPoller", "Merged $hash for info on ${closedGroupSessionId.hexString()}")
|
||||
if (ENABLE_LOGGING) Log.d("ClosedGroupPoller", "Merged $hash for info on ${closedGroupSessionId.hexString()}")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -195,15 +200,22 @@ class ClosedGroupPoller(private val executor: CoroutineScope,
|
||||
membersConfig: GroupMembersConfig) {
|
||||
parseMessages(response).forEach { (message, hash, _) ->
|
||||
membersConfig.merge(hash to message)
|
||||
Log.d("ClosedGroupPoller", "Merged $hash for members on ${closedGroupSessionId.hexString()}")
|
||||
if (ENABLE_LOGGING) Log.d("ClosedGroupPoller", "Merged $hash for members on ${closedGroupSessionId.hexString()}")
|
||||
}
|
||||
}
|
||||
|
||||
private fun handleMessages(response: RawResponse, keysConfig: GroupKeysConfig) {
|
||||
val messages = parseMessages(response)
|
||||
if (messages.isNotEmpty()) {
|
||||
// TODO: process decrypting bundles
|
||||
private fun handleMessages(response: RawResponse, snode: Snode) {
|
||||
val body = response["body"] as RawResponse
|
||||
val messages = SnodeAPI.parseRawMessagesResponse(body, snode, closedGroupSessionId.hexString())
|
||||
val parameters = messages.map { (envelope, serverHash) ->
|
||||
MessageReceiveParameters(envelope.toByteArray(), serverHash = serverHash)
|
||||
}
|
||||
parameters.chunked(BatchMessageReceiveJob.BATCH_DEFAULT_NUMBER).forEach { chunk ->
|
||||
val job = BatchMessageReceiveJob(chunk)
|
||||
JobQueue.shared.add(job)
|
||||
}
|
||||
if (ENABLE_LOGGING) Log.d("ClosedGroupPoller", "namespace 0 message size: ${messages.size}")
|
||||
|
||||
}
|
||||
|
||||
}
|
@@ -3,7 +3,6 @@
|
||||
package org.session.libsession.snode
|
||||
|
||||
import android.os.Build
|
||||
import androidx.annotation.WorkerThread
|
||||
import com.goterl.lazysodium.LazySodiumAndroid
|
||||
import com.goterl.lazysodium.SodiumAndroid
|
||||
import com.goterl.lazysodium.exceptions.SodiumException
|
||||
@@ -655,34 +654,32 @@ object SnodeAPI {
|
||||
}
|
||||
}
|
||||
|
||||
@WorkerThread
|
||||
fun sendAuthenticatedMessage(message: SnodeMessage, signingKey: ByteArray, namespace: Int): RawResponse {
|
||||
fun sendAuthenticatedMessage(message: SnodeMessage, signingKey: ByteArray, namespace: Int): RawResponsePromise {
|
||||
val pubKey = message.recipient
|
||||
|
||||
return retryIfNeeded(maxRetryCount) {
|
||||
val timestamp = nowWithOffset
|
||||
|
||||
val signature = ByteArray(Sign.BYTES)
|
||||
// assume namespace here is non-zero, as zero namespace doesn't require auth
|
||||
val verificationData = "store$namespace$timestamp".toByteArray()
|
||||
val sigTimestamp = nowWithOffset
|
||||
val verificationData = "store$namespace$sigTimestamp".toByteArray()
|
||||
try {
|
||||
sodium.cryptoSignDetached(signature, verificationData, verificationData.size.toLong(), signingKey)
|
||||
} catch (exception: Exception) {
|
||||
return@retryIfNeeded Promise.ofFail(Error.SigningFailed)
|
||||
}
|
||||
|
||||
val parameters = mapOf(
|
||||
"pubKey" to pubKey,
|
||||
"data" to message.data,
|
||||
"timestamp" to timestamp.toString(),
|
||||
"sig_timestamp" to timestamp.toString(),
|
||||
"signature" to Base64.encodeBytes(verificationData)
|
||||
val parameters = message.toJSON().toMutableMap<String,Any>()
|
||||
|
||||
parameters += mapOf(
|
||||
"sig_timestamp" to sigTimestamp,
|
||||
"signature" to Base64.encodeBytes(signature)
|
||||
)
|
||||
|
||||
getSingleTargetSnode(pubKey).bind { targetSnode ->
|
||||
invoke(Snode.Method.SendMessage, targetSnode, parameters, pubKey)
|
||||
}
|
||||
}.get()
|
||||
}
|
||||
}
|
||||
|
||||
fun sendMessage(message: SnodeMessage, requiresAuth: Boolean = false, namespace: Int = 0): RawResponsePromise {
|
||||
|
@@ -3,6 +3,7 @@ package org.session.libsession.utilities
|
||||
import org.session.libsession.messaging.open_groups.OpenGroup
|
||||
import org.session.libsignal.messages.SignalServiceGroup
|
||||
import org.session.libsignal.utilities.Hex
|
||||
import org.session.libsignal.utilities.IdPrefix
|
||||
import org.session.libsignal.utilities.SessionId
|
||||
import java.io.IOException
|
||||
|
||||
@@ -30,7 +31,9 @@ object GroupUtil {
|
||||
|
||||
@JvmStatic
|
||||
fun getEncodedClosedGroupID(groupID: ByteArray): String {
|
||||
return LEGACY_CLOSED_GROUP_PREFIX + Hex.toStringCondensed(groupID)
|
||||
val hex = Hex.toStringCondensed(groupID)
|
||||
if (hex.startsWith(IdPrefix.GROUP.value)) throw IllegalArgumentException("Trying to encode a new closed group")
|
||||
return LEGACY_CLOSED_GROUP_PREFIX + hex
|
||||
}
|
||||
|
||||
@JvmStatic
|
||||
@@ -92,6 +95,7 @@ object GroupUtil {
|
||||
@JvmStatic
|
||||
@Throws(IOException::class)
|
||||
fun doubleEncodeGroupID(groupPublicKey: String): String {
|
||||
if (groupPublicKey.startsWith(IdPrefix.GROUP.value)) throw IllegalArgumentException("Trying to double encode a new closed group")
|
||||
return getEncodedClosedGroupID(getEncodedClosedGroupID(Hex.fromStringCondensed(groupPublicKey)).toByteArray())
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user