fix: no duplicate group left messages, more efficient closed group polling

This commit is contained in:
jubb 2021-03-23 10:00:51 +11:00
parent 8ee58459dd
commit 6f2bad9b59
8 changed files with 52 additions and 7 deletions

View File

@ -372,6 +372,10 @@ class Storage(context: Context, helper: SQLCipherOpenHelper) : Database(context,
DatabaseFactory.getGroupDatabase(context).create(groupId, title, members, avatar, relay, admins, formationTimestamp)
}
override fun isGroupActive(groupPublicKey: String): Boolean {
return DatabaseFactory.getGroupDatabase(context).getGroup(GroupUtil.doubleEncodeGroupID(groupPublicKey)).orNull()?.isActive == true
}
override fun setActive(groupID: String, value: Boolean) {
DatabaseFactory.getGroupDatabase(context).setActive(groupID, value)
}
@ -433,6 +437,12 @@ class Storage(context: Context, helper: SQLCipherOpenHelper) : Database(context,
return DatabaseFactory.getLokiAPIDatabase(context).getAllClosedGroupPublicKeys()
}
override fun getAllActiveClosedGroupPublicKeys(): Set<String> {
return DatabaseFactory.getLokiAPIDatabase(context).getAllClosedGroupPublicKeys().filter {
getGroup(GroupUtil.doubleEncodeGroupID(it))?.isActive == true
}.toSet()
}
override fun addClosedGroupPublicKey(groupPublicKey: String) {
DatabaseFactory.getLokiAPIDatabase(context).addClosedGroupPublicKey(groupPublicKey)
}

View File

@ -102,11 +102,13 @@ interface StorageProtocol {
// Closed Groups
fun getGroup(groupID: String): GroupRecord?
fun createGroup(groupID: String, title: String?, members: List<Address>, avatar: SignalServiceAttachmentPointer?, relay: String?, admins: List<Address>, formationTimestamp: Long)
fun isGroupActive(groupPublicKey: String): Boolean
fun setActive(groupID: String, value: Boolean)
fun removeMember(groupID: String, member: Address)
fun updateMembers(groupID: String, members: List<Address>)
// Closed Group
fun getAllClosedGroupPublicKeys(): Set<String>
fun getAllActiveClosedGroupPublicKeys(): Set<String>
fun addClosedGroupPublicKey(groupPublicKey: String)
fun removeClosedGroupPublicKey(groupPublicKey: String)
fun addClosedGroupEncryptionKeyPair(encryptionKeyPair: ECKeyPair, groupPublicKey: String)

View File

@ -99,6 +99,8 @@ object MessageReceiver {
else -> throw Error.UnknownEnvelopeType
}
}
// Don't process the envelope any further if the message has been handled already
if (storage.isMessageDuplicated(envelope.timestamp, sender!!) && !isRetry) throw Error.DuplicateMessage
// Don't process the envelope any further if the sender is blocked
if (isBlock(sender!!)) throw Error.SenderBlocked
// Parse the proto

View File

@ -264,6 +264,10 @@ private fun MessageReceiver.handleClosedGroupUpdated(message: ClosedGroupControl
Log.d("Loki", "Ignoring closed group info message for nonexistent group.")
return
}
if (!group.isActive) {
Log.d("Loki", "Ignoring closed group info message for inactive group")
return
}
val oldMembers = group.members.map { it.serialize() }
// Check common group update logic
if (!isValidGroupUpdate(group, message.sentTimestamp!!, senderPublicKey)) {
@ -312,6 +316,10 @@ private fun MessageReceiver.handleClosedGroupEncryptionKeyPair(message: ClosedGr
Log.d("Loki", "Ignoring closed group info message for nonexistent group.")
return
}
if (!group.isActive) {
Log.d("Loki", "Ignoring closed group info message for inactive group")
return
}
if (!group.members.map { it.toString() }.contains(senderPublicKey)) {
Log.d("Loki", "Ignoring closed group encryption key pair from non-member.")
return
@ -345,6 +353,10 @@ private fun MessageReceiver.handleClosedGroupNameChanged(message: ClosedGroupCon
Log.d("Loki", "Ignoring closed group info message for nonexistent group.")
return
}
if (!group.isActive) {
Log.d("Loki", "Ignoring closed group info message for inactive group")
return
}
// Check common group update logic
if (!isValidGroupUpdate(group, message.sentTimestamp!!, senderPublicKey)) {
return
@ -369,6 +381,10 @@ private fun MessageReceiver.handleClosedGroupMembersAdded(message: ClosedGroupCo
Log.d("Loki", "Ignoring closed group info message for nonexistent group.")
return
}
if (!group.isActive) {
Log.d("Loki", "Ignoring closed group info message for inactive group")
return
}
if (!isValidGroupUpdate(group, message.sentTimestamp!!, senderPublicKey)) { return }
val name = group.title
// Check common group update logic
@ -411,6 +427,10 @@ private fun MessageReceiver.handleClosedGroupMembersRemoved(message: ClosedGroup
Log.d("Loki", "Ignoring closed group info message for nonexistent group.")
return
}
if (!group.isActive) {
Log.d("Loki", "Ignoring closed group info message for inactive group")
return
}
val name = group.title
// Check common group update logic
val members = group.members.map { it.serialize() }
@ -460,6 +480,10 @@ private fun MessageReceiver.handleClosedGroupMemberLeft(message: ClosedGroupCont
Log.d("Loki", "Ignoring closed group info message for nonexistent group.")
return
}
if (!group.isActive) {
Log.d("Loki", "Ignoring closed group info message for inactive group")
return
}
val name = group.title
// Check common group update logic
val members = group.members.map { it.serialize() }

View File

@ -337,6 +337,6 @@ object MessageSender {
@JvmStatic
fun explicitLeave(groupPublicKey: String): Promise<Unit, Exception> {
return leave(groupPublicKey)
return leave(groupPublicKey, false)
}
}

View File

@ -209,6 +209,7 @@ fun MessageSender.leave(groupPublicKey: String, notifyUser: Boolean = true): Pro
val closedGroupControlMessage = ClosedGroupControlMessage(ClosedGroupControlMessage.Kind.MemberLeft())
val sentTime = System.currentTimeMillis()
closedGroupControlMessage.sentTimestamp = sentTime
storage.setActive(groupID, false)
sendNonDurably(closedGroupControlMessage, Address.fromSerialized(groupID)).success {
// Notify the user
val infoType = SignalServiceProtos.GroupContext.Type.QUIT
@ -219,6 +220,8 @@ fun MessageSender.leave(groupPublicKey: String, notifyUser: Boolean = true): Pro
// Remove the group private key and unsubscribe from PNs
MessageReceiver.disableLocalGroupAndUnsubscribe(groupPublicKey, groupID, userPublicKey)
deferred.resolve(Unit)
}.fail {
storage.setActive(groupID, true)
}
}
return deferred.promise

View File

@ -59,7 +59,7 @@ class ClosedGroupPoller {
// region Private API
private fun poll(): List<Promise<Unit, Exception>> {
if (!isPolling) { return listOf() }
val publicKeys = MessagingConfiguration.shared.storage.getAllClosedGroupPublicKeys()
val publicKeys = MessagingConfiguration.shared.storage.getAllActiveClosedGroupPublicKeys()
return publicKeys.map { publicKey ->
val promise = SnodeAPI.getSwarm(publicKey).bind { swarm ->
val snode = swarm.getRandomElementOrNull() ?: throw InsufficientSnodesException() // Should be cryptographically secure
@ -67,6 +67,10 @@ class ClosedGroupPoller {
SnodeAPI.getRawMessages(snode, publicKey).map {SnodeAPI.parseRawMessagesResponse(it, snode, publicKey) }
}
promise.successBackground { messages ->
if (!MessagingConfiguration.shared.storage.isGroupActive(publicKey)) {
// ignore inactive group's messages
return@successBackground
}
if (messages.isNotEmpty()) {
Log.d("Loki", "Received ${messages.count()} new message(s) in closed group with public key: $publicKey.")
}

View File

@ -2,11 +2,11 @@ package org.session.libsession.snode
import nl.komponents.kovenant.Promise
import nl.komponents.kovenant.deferred
import org.session.libsignal.utilities.JsonUtil
import org.session.libsession.utilities.AESGCM.EncryptionResult
import org.session.libsession.utilities.AESGCM
import org.session.libsignal.utilities.ThreadUtils
import org.session.libsession.utilities.AESGCM.EncryptionResult
import org.session.libsignal.service.loki.utilities.toHexString
import org.session.libsignal.utilities.JsonUtil
import org.session.libsignal.utilities.ThreadUtils
import java.nio.Buffer
import java.nio.ByteBuffer
import java.nio.ByteOrder
@ -62,7 +62,7 @@ object OnionRequestEncryption {
*/
internal fun encryptHop(lhs: OnionRequestAPI.Destination, rhs: OnionRequestAPI.Destination, previousEncryptionResult: EncryptionResult): Promise<EncryptionResult, Exception> {
val deferred = deferred<EncryptionResult, Exception>()
Thread {
ThreadUtils.queue {
try {
val payload: MutableMap<String, Any>
when (rhs) {
@ -89,7 +89,7 @@ object OnionRequestEncryption {
} catch (exception: Exception) {
deferred.reject(exception)
}
}.start()
}
return deferred.promise
}
}