Improvement

This commit is contained in:
SessionHero01
2024-09-24 16:26:29 +10:00
parent 80e3e563ce
commit 8c1eb1550b
17 changed files with 959 additions and 489 deletions

View File

@@ -170,19 +170,14 @@ interface StorageProtocol {
fun updateTimestampUpdated(groupID: String, updatedTimestamp: Long)
// Closed Groups
fun createNewGroup(groupName: String, groupDescription: String, members: Set<Contact>): Optional<Recipient>
fun getMembers(groupPublicKey: String): List<LibSessionGroupMember>
fun respondToClosedGroupInvitation(threadId: Long, groupRecipient: Recipient, approved: Boolean)
fun addClosedGroupInvite(groupId: AccountId, name: String, authData: ByteArray?, adminKey: ByteArray?, invitingAdmin: AccountId, invitingMessageHash: String?)
fun setGroupInviteCompleteIfNeeded(approved: Boolean, invitee: String, closedGroup: AccountId)
fun getLibSessionClosedGroup(groupAccountId: String): GroupInfo.ClosedGroupInfo?
fun getClosedGroupDisplayInfo(groupAccountId: String): GroupDisplayInfo?
fun insertGroupInfoChange(message: GroupUpdated, closedGroup: AccountId): Long?
fun insertGroupInfoLeaving(closedGroup: AccountId): Long?
fun insertGroupInviteControlMessage(sentTimestamp: Long, senderPublicKey: String, closedGroup: AccountId, groupName: String): Long?
fun updateGroupInfoChange(messageId: Long, newType: UpdateMessageData.Kind)
fun handleMemberLeftNotification(message: GroupUpdated, closedGroupId: AccountId)
fun handleKicked(groupAccountId: AccountId)
fun setName(groupSessionId: String, newName: String)
fun sendGroupUpdateDeleteMessage(groupSessionId: String, messageHashes: List<String>): Promise<Unit, Exception>
// Groups

View File

@@ -1,6 +1,8 @@
package org.session.libsession.messaging.groups
import org.session.libsession.messaging.contacts.Contact
import org.session.libsession.messaging.messages.control.GroupUpdated
import org.session.libsession.utilities.recipients.Recipient
import org.session.libsignal.utilities.AccountId
/**
@@ -8,6 +10,12 @@ import org.session.libsignal.utilities.AccountId
* removing members, promoting members, leaving groups, etc.
*/
interface GroupManagerV2 {
suspend fun createGroup(
groupName: String,
groupDescription: String,
members: Set<Contact>
): Recipient
suspend fun inviteMembers(
group: AccountId,
newMembers: List<AccountId>,
@@ -25,4 +33,28 @@ interface GroupManagerV2 {
suspend fun leaveGroup(group: AccountId, deleteOnLeave: Boolean)
suspend fun promoteMember(group: AccountId, members: List<AccountId>)
suspend fun onReceiveInvitation(
groupId: AccountId,
groupName: String,
authData: ByteArray,
inviter: AccountId,
inviteMessageHash: String?
)
suspend fun onReceivePromotion(
groupId: AccountId,
groupName: String,
adminKey: ByteArray,
promoter: AccountId,
promoteMessageHash: String?
)
suspend fun respondToInvitation(groupId: AccountId, approved: Boolean): Unit?
suspend fun handleInviteResponse(groupId: AccountId, sender: AccountId, approved: Boolean)
suspend fun handleKicked(groupId: AccountId)
suspend fun setName(groupId: AccountId, newName: String)
}

View File

@@ -43,7 +43,6 @@ import org.session.libsession.messaging.utilities.SodiumUtilities
import org.session.libsession.messaging.utilities.WebRtcUtils
import org.session.libsession.snode.SnodeAPI
import org.session.libsession.utilities.Address
import org.session.libsession.utilities.Address.Companion.fromSerialized
import org.session.libsession.utilities.GroupRecord
import org.session.libsession.utilities.GroupUtil
import org.session.libsession.utilities.GroupUtil.doubleEncodeGroupID
@@ -173,7 +172,7 @@ private fun MessageReceiver.handleExpirationTimerUpdate(message: ExpirationTimer
val module = MessagingModuleConfiguration.shared
try {
val threadId = fromSerialized(message.groupPublicKey?.let(::doubleEncodeGroupID) ?: message.sender!!)
val threadId = Address.fromSerialized(message.groupPublicKey?.let(::doubleEncodeGroupID) ?: message.sender!!)
.let(module.storage::getOrCreateThreadIdFor)
module.storage.setExpirationConfiguration(
@@ -363,11 +362,18 @@ fun MessageReceiver.handleVisibleMessage(
}
// Handle group invite response if new closed group
if (threadRecipient?.isClosedGroupV2Recipient == true) {
storage.setGroupInviteCompleteIfNeeded(
approved = true,
recipient.address.serialize(),
AccountId(threadRecipient.address.serialize())
)
GlobalScope.launch {
try {
MessagingModuleConfiguration.shared.groupManagerV2
.handleInviteResponse(
AccountId(threadRecipient.address.serialize()),
AccountId(messageSender),
approved = true
)
} catch (e: Exception) {
Log.e("Loki", "Failed to handle invite response", e)
}
}
}
// Parse quote if needed
var quoteModel: QuoteModel? = null
@@ -659,20 +665,25 @@ private fun handleGroupInfoChange(message: GroupUpdated, closedGroup: AccountId)
}
private fun handlePromotionMessage(message: GroupUpdated) {
val storage = MessagingModuleConfiguration.shared.storage
val promotion = message.inner.promoteMessage
val seed = promotion.groupIdentitySeed.toByteArray()
val keyPair = Sodium.ed25519KeyPair(seed)
val sender = message.sender!!
val adminId = AccountId(sender)
storage.addClosedGroupInvite(
groupId = AccountId(IdPrefix.GROUP, keyPair.pubKey),
name = promotion.name,
authData = null,
adminKey = keyPair.secretKey,
invitingAdmin = adminId,
message.serverHash
)
GlobalScope.launch {
try {
MessagingModuleConfiguration.shared.groupManagerV2
.onReceivePromotion(
groupId = AccountId(IdPrefix.GROUP, keyPair.pubKey),
groupName = promotion.name,
adminKey = keyPair.secretKey,
promoter = adminId,
promoteMessageHash = message.serverHash
)
} catch (e: Exception) {
Log.e("GroupUpdated", "Failed to handle promotion message", e)
}
}
}
private fun MessageReceiver.handleInviteResponse(message: GroupUpdated, closedGroup: AccountId) {
@@ -680,7 +691,13 @@ private fun MessageReceiver.handleInviteResponse(message: GroupUpdated, closedGr
// val profile = message // maybe we do need data to be the inner so we can access profile
val storage = MessagingModuleConfiguration.shared.storage
val approved = message.inner.inviteResponse.isApproved
storage.setGroupInviteCompleteIfNeeded(approved, sender, closedGroup)
GlobalScope.launch {
try {
MessagingModuleConfiguration.shared.groupManagerV2.handleInviteResponse(closedGroup, AccountId(sender), approved)
} catch (e: Exception) {
Log.e("GroupUpdated", "Failed to handle invite response", e)
}
}
}
private fun MessageReceiver.handleNewLibSessionClosedGroupMessage(message: GroupUpdated) {
@@ -696,15 +713,20 @@ private fun MessageReceiver.handleNewLibSessionClosedGroupMessage(message: Group
val sender = message.sender!!
val adminId = AccountId(sender)
// add the group
storage.addClosedGroupInvite(
groupId,
invite.name,
invite.memberAuthData.toByteArray(),
null,
adminId,
message.serverHash
)
GlobalScope.launch {
try {
MessagingModuleConfiguration.shared.groupManagerV2
.onReceiveInvitation(
groupId = groupId,
groupName = invite.name,
authData = invite.memberAuthData.toByteArray(),
inviter = adminId,
inviteMessageHash = message.serverHash
)
} catch (e: Exception) {
Log.e("GroupUpdated", "Failed to handle invite message", e)
}
}
}
/**
@@ -768,7 +790,7 @@ private fun handleNewClosedGroup(sender: String, sentTimestamp: Long, groupPubli
storage.updateTitle(groupID, name)
storage.updateMembers(groupID, members.map { Address.fromSerialized(it) })
} else {
storage.createGroup(groupID, name, LinkedList(members.map { fromSerialized(it) }),
storage.createGroup(groupID, name, LinkedList(members.map { Address.fromSerialized(it) }),
null, null, LinkedList(admins.map { Address.fromSerialized(it) }), formationTimestamp)
}
storage.setProfileSharing(Address.fromSerialized(groupID), true)

View File

@@ -2,6 +2,7 @@ package org.session.libsession.messaging.sending_receiving.pollers
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.delay
import kotlinx.coroutines.isActive
@@ -13,6 +14,7 @@ import network.loki.messenger.libsession_util.util.GroupInfo
import network.loki.messenger.libsession_util.util.Sodium
import org.session.libsession.database.StorageProtocol
import org.session.libsession.messaging.MessagingModuleConfiguration
import org.session.libsession.messaging.groups.GroupManagerV2
import org.session.libsession.messaging.jobs.BatchMessageReceiveJob
import org.session.libsession.messaging.jobs.JobQueue
import org.session.libsession.messaging.jobs.MessageReceiveParameters
@@ -38,7 +40,7 @@ class ClosedGroupPoller(
private val executor: CoroutineDispatcher,
private val closedGroupSessionId: AccountId,
private val configFactoryProtocol: ConfigFactoryProtocol,
private val storageProtocol: StorageProtocol = MessagingModuleConfiguration.shared.storage) {
private val groupManagerV2: GroupManagerV2) {
data class ParsedRawMessage(
val data: ByteArray,
@@ -268,40 +270,14 @@ class ClosedGroupPoller(
if (Sodium.KICKED_REGEX.matches(message)) {
val (sessionId, generation) = message.split("-")
if (sessionId == userSessionId.hexString && generation.toInt() >= keys.currentGeneration()) {
Log.d("GroupPoller", "We were kicked from the group, delete and stop polling")
stop()
configFactoryProtocol.userGroups?.let { userGroups ->
userGroups.getClosedGroup(closedGroupSessionId.hexString)?.let { group ->
// Retrieve the group name one last time from the group info,
// as we are going to clear the keys, we won't have the chance to
// read the group name anymore.
val groupName = configFactoryProtocol.getGroupInfoConfig(closedGroupSessionId)
?.use { it.getName() }
?: group.name
userGroups.set(group.copy(
authData = null,
adminKey = null,
name = groupName
))
configFactoryProtocol.persist(userGroups, SnodeAPI.nowWithOffset)
GlobalScope.launch {
try {
groupManagerV2.handleKicked(closedGroupSessionId)
} catch (e: Exception) {
Log.e("GroupPoller", "Error handling kicked message: $e")
}
}
storageProtocol.handleKicked(closedGroupSessionId)
MessagingModuleConfiguration.shared.storage.insertIncomingInfoMessage(
context = MessagingModuleConfiguration.shared.context,
senderPublicKey = userSessionId.hexString,
groupID = closedGroupSessionId.hexString,
type = SignalServiceGroup.Type.KICKED,
name = "",
members = emptyList(),
admins = emptyList(),
sentTimestamp = SnodeAPI.nowWithOffset,
)
}
}
}

View File

@@ -2,6 +2,8 @@
package org.session.libsession.snode
import android.os.SystemClock
import com.fasterxml.jackson.databind.JsonNode
import com.goterl.lazysodium.exceptions.SodiumException
import com.goterl.lazysodium.interfaces.GenericHash
import com.goterl.lazysodium.interfaces.PwHash
@@ -9,8 +11,18 @@ import com.goterl.lazysodium.interfaces.SecretBox
import com.goterl.lazysodium.utils.Key
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.async
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.scan
import kotlinx.coroutines.launch
import kotlinx.coroutines.selects.onTimeout
import kotlinx.coroutines.selects.select
import kotlinx.coroutines.withContext
import kotlinx.coroutines.withTimeoutOrNull
import nl.komponents.kovenant.Promise
import nl.komponents.kovenant.all
import nl.komponents.kovenant.functional.bind
@@ -43,6 +55,8 @@ import kotlin.collections.component1
import kotlin.collections.component2
import kotlin.collections.set
import kotlin.properties.Delegates.observable
import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds
object SnodeAPI {
internal val database: LokiAPIDatabaseProtocol
@@ -115,7 +129,7 @@ object SnodeAPI {
val method: String,
val params: Map<String, Any>,
@Transient
val namespace: Int?
val namespace: Int?,
) // assume signatures, pubkey and namespaces are attached in parameters if required
// Internal API
@@ -571,6 +585,99 @@ object SnodeAPI {
}
}
private data class RequestInfo(
val accountId: AccountId,
val request: SnodeBatchRequestInfo,
val responseType: Class<*>,
val callback: SendChannel<Result<Any>>,
val requestTime: Long = SystemClock.uptimeMillis(),
)
private val batchedRequestsSender: SendChannel<RequestInfo>
init {
val batchRequests = Channel<RequestInfo>()
batchedRequestsSender = batchRequests
val batchWindowMills = 100L
@Suppress("OPT_IN_USAGE")
GlobalScope.launch {
val batches = hashMapOf<AccountId, MutableList<RequestInfo>>()
while (true) {
val batch = select<List<RequestInfo>?> {
// If we receive a request, add it to the batch
batchRequests.onReceive {
batches.getOrPut(it.accountId) { mutableListOf() }.add(it)
null
}
// If we have anything in the batch, look for the one that is about to expire
// and wait for it to expire, remove it from the batches and send it for
// processing.
if (batches.isNotEmpty()) {
val earliestBatch = batches.minBy { it.value.first().requestTime }
val deadline = earliestBatch.value.first().requestTime + batchWindowMills
onTimeout(
timeMillis = (deadline - SystemClock.uptimeMillis()).coerceAtLeast(0)
) {
batches.remove(earliestBatch.key)
}
}
}
if (batch != null) {
launch {
val accountId = batch.first().accountId
val responses = try {
getBatchResponse(
snode = getSingleTargetSnode(accountId.hexString).await(),
publicKey = accountId.hexString,
requests = batch.map { it.request }, sequence = false
)
} catch (e: Exception) {
for (req in batch) {
req.callback.send(Result.failure(e))
}
return@launch
}
for ((req, resp) in batch.zip(responses.results)) {
req.callback.send(kotlin.runCatching {
JsonUtil.fromJson(resp.body, req.responseType)
})
}
// Close all channels in the requests just in case we don't have paired up
// responses.
for (req in batch) {
req.callback.close()
}
}
}
}
}
}
suspend fun <T> sendBatchRequest(
swarmAccount: AccountId,
request: SnodeBatchRequestInfo,
responseType: Class<T>,
): T {
val callback = Channel<Result<T>>()
@Suppress("UNCHECKED_CAST")
batchedRequestsSender.send(RequestInfo(swarmAccount, request, responseType, callback as SendChannel<Any>))
return callback.receive().getOrThrow()
}
suspend fun sendBatchRequest(
swarmAccount: AccountId,
request: SnodeBatchRequestInfo,
): JsonNode {
return sendBatchRequest(swarmAccount, request, JsonNode::class.java)
}
suspend fun getBatchResponse(
snode: Snode,
publicKey: String,
@@ -697,8 +804,15 @@ object SnodeAPI {
return scope.retrySuspendAsPromise(maxRetryCount) {
val destination = message.recipient
val snode = getSingleTargetSnode(destination).await()
invoke(Snode.Method.SendMessage, snode, params, destination).await()
sendBatchRequest(
swarmAccount = AccountId(destination),
request = SnodeBatchRequestInfo(
method = Snode.Method.SendMessage.rawValue,
params = params,
namespace = namespace
),
responseType = Map::class.java
)
}
}

View File

@@ -2,12 +2,13 @@ package org.session.libsession.snode.model
import com.fasterxml.jackson.annotation.JsonCreator
import com.fasterxml.jackson.annotation.JsonProperty
import com.fasterxml.jackson.databind.JsonNode
data class BatchResponse @JsonCreator constructor(
@param:JsonProperty("results") val results: List<Item>,
) {
data class Item @JsonCreator constructor(
@param:JsonProperty("code") val code: Int,
@param:JsonProperty("body") val body: Map<String, Any?>?,
@param:JsonProperty("body") val body: JsonNode,
)
}

View File

@@ -0,0 +1,9 @@
package org.session.libsession.snode.model
import com.fasterxml.jackson.annotation.JsonCreator
import com.fasterxml.jackson.annotation.JsonProperty
data class StoreMessageResponse @JsonCreator constructor(
@JsonProperty("hash") val hash: String,
@JsonProperty("t") val timestamp: Long,
)