Various issues

This commit is contained in:
SessionHero01 2024-10-02 15:17:13 +10:00
parent 3faae5ddbe
commit 1f5fde0d9a
No known key found for this signature in database
13 changed files with 133 additions and 104 deletions

View File

@ -41,6 +41,7 @@ import org.session.libsession.utilities.MutableUserConfigs
import org.session.libsession.utilities.TextSecurePreferences import org.session.libsession.utilities.TextSecurePreferences
import org.session.libsession.utilities.UserConfigType import org.session.libsession.utilities.UserConfigType
import org.session.libsession.utilities.UserConfigs import org.session.libsession.utilities.UserConfigs
import org.session.libsession.utilities.getClosedGroup
import org.session.libsignal.crypto.ecc.DjbECPublicKey import org.session.libsignal.crypto.ecc.DjbECPublicKey
import org.session.libsignal.utilities.AccountId import org.session.libsignal.utilities.AccountId
import org.session.libsignal.utilities.Hex import org.session.libsignal.utilities.Hex
@ -299,11 +300,7 @@ class ConfigFactory @Inject constructor(
override fun <T> withGroupConfigs(groupId: AccountId, cb: (GroupConfigs) -> T): T { override fun <T> withGroupConfigs(groupId: AccountId, cb: (GroupConfigs) -> T): T {
val configs = groupConfigs.getOrPut(groupId) { val configs = groupConfigs.getOrPut(groupId) {
val groupAdminKey = requireNotNull(withUserConfigs { val groupAdminKey = getClosedGroup(groupId)?.adminKey
it.userGroups.getClosedGroup(groupId.hexString)
}) {
"Group not found"
}.adminKey
GroupConfigsImpl( GroupConfigsImpl(
requiresCurrentUserED25519SecKey(), requiresCurrentUserED25519SecKey(),
@ -318,7 +315,14 @@ class ConfigFactory @Inject constructor(
} }
} }
private fun <T> doWithMutableGroupConfigs(groupId: AccountId, cb: (GroupConfigsImpl) -> Pair<T, Boolean>): T { private fun <T> doWithMutableGroupConfigs(
groupId: AccountId,
recreateConfigInstances: Boolean,
cb: (GroupConfigsImpl) -> Pair<T, Boolean>): T {
if (recreateConfigInstances) {
groupConfigs.remove(groupId)
}
val (result, changed) = withGroupConfigs(groupId) { configs -> val (result, changed) = withGroupConfigs(groupId) { configs ->
cb(configs as GroupConfigsImpl) cb(configs as GroupConfigsImpl)
} }
@ -336,9 +340,10 @@ class ConfigFactory @Inject constructor(
override fun <T> withMutableGroupConfigs( override fun <T> withMutableGroupConfigs(
groupId: AccountId, groupId: AccountId,
recreateConfigInstances: Boolean,
cb: (MutableGroupConfigs) -> T cb: (MutableGroupConfigs) -> T
): T { ): T {
return doWithMutableGroupConfigs(groupId) { return doWithMutableGroupConfigs(recreateConfigInstances = recreateConfigInstances, groupId = groupId) {
cb(it) to it.dumpIfNeeded() cb(it) to it.dumpIfNeeded()
} }
} }
@ -376,7 +381,7 @@ class ConfigFactory @Inject constructor(
info: List<ConfigMessage>, info: List<ConfigMessage>,
members: List<ConfigMessage> members: List<ConfigMessage>
) { ) {
doWithMutableGroupConfigs(groupId) { configs -> doWithMutableGroupConfigs(groupId, false) { configs ->
// Keys must be loaded first as they are used to decrypt the other config messages // Keys must be loaded first as they are used to decrypt the other config messages
val keysLoaded = keys.fold(false) { acc, msg -> val keysLoaded = keys.fold(false) { acc, msg ->
configs.groupKeys.loadKey(msg.data, msg.hash, msg.timestamp, configs.groupInfo.pointer, configs.groupMembers.pointer) || acc configs.groupKeys.loadKey(msg.data, msg.hash, msg.timestamp, configs.groupInfo.pointer, configs.groupMembers.pointer) || acc
@ -424,7 +429,7 @@ class ConfigFactory @Inject constructor(
return return
} }
doWithMutableGroupConfigs(groupId) { configs -> doWithMutableGroupConfigs(groupId, false) { configs ->
members?.let { (push, result) -> configs.groupMembers.confirmPushed(push.seqNo, result.hash) } members?.let { (push, result) -> configs.groupMembers.confirmPushed(push.seqNo, result.hash) }
info?.let { (push, result) -> configs.groupInfo.confirmPushed(push.seqNo, result.hash) } info?.let { (push, result) -> configs.groupInfo.confirmPushed(push.seqNo, result.hash) }
keysPush?.let { (hash, timestamp) -> keysPush?.let { (hash, timestamp) ->

View File

@ -634,7 +634,7 @@ class GroupManagerV2Impl @Inject constructor(
pollerFactory.pollerFor(group.groupAccountId)?.start() pollerFactory.pollerFor(group.groupAccountId)?.start()
} }
override suspend fun onReceiveInvitation( override suspend fun handleInvitation(
groupId: AccountId, groupId: AccountId,
groupName: String, groupName: String,
authData: ByteArray, authData: ByteArray,
@ -663,7 +663,7 @@ class GroupManagerV2Impl @Inject constructor(
} }
} }
override suspend fun onReceivePromotion( override suspend fun handlePromotion(
groupId: AccountId, groupId: AccountId,
groupName: String, groupName: String,
adminKey: ByteArray, adminKey: ByteArray,
@ -692,7 +692,7 @@ class GroupManagerV2Impl @Inject constructor(
} }
// Update our promote state // Update our promote state
configFactory.withMutableGroupConfigs(groupId) { configs -> configFactory.withMutableGroupConfigs(recreateConfigInstances = true, groupId = groupId) { configs ->
configs.groupMembers.get(userAuth.accountId.hexString)?.let { member -> configs.groupMembers.get(userAuth.accountId.hexString)?.let { member ->
configs.groupMembers.set(member.setPromoteSuccess()) configs.groupMembers.set(member.setPromoteSuccess())
} }

View File

@ -60,7 +60,7 @@ internal class LoadingViewModel @Inject constructor(
val events = _events.asSharedFlow() val events = _events.asSharedFlow()
init { init {
viewModelScope.launch(Dispatchers.IO) { viewModelScope.launch {
state.flatMapLatest { state.flatMapLatest {
when (it) { when (it) {
State.LOADING -> progress(0f, 1f, TIMEOUT_TIME) State.LOADING -> progress(0f, 1f, TIMEOUT_TIME)

View File

@ -33,6 +33,7 @@ import org.session.libsignal.utilities.Base64
import org.session.libsignal.utilities.Log import org.session.libsignal.utilities.Log
import org.session.libsignal.utilities.Namespace import org.session.libsignal.utilities.Namespace
import org.session.libsignal.utilities.Snode import org.session.libsignal.utilities.Snode
import org.session.libsignal.utilities.retryWithUniformInterval
import java.util.concurrent.Executors import java.util.concurrent.Executors
import javax.inject.Inject import javax.inject.Inject
@ -59,7 +60,6 @@ class ConfigSyncHandler @Inject constructor(
configFactory.configUpdateNotifications configFactory.configUpdateNotifications
.collect { changes -> .collect { changes ->
try {
when (changes) { when (changes) {
is ConfigUpdateNotification.GroupConfigsDeleted -> { is ConfigUpdateNotification.GroupConfigsDeleted -> {
groupMutex.remove(changes.groupId) groupMutex.remove(changes.groupId)
@ -68,24 +68,32 @@ class ConfigSyncHandler @Inject constructor(
is ConfigUpdateNotification.GroupConfigsUpdated -> { is ConfigUpdateNotification.GroupConfigsUpdated -> {
// Group config pushing is limited to its own dispatcher // Group config pushing is limited to its own dispatcher
launch { launch {
groupMutex.getOrPut(changes.groupId) { Mutex() }.withLock { try {
pushGroupConfigsChangesIfNeeded(changes.groupId) retryWithUniformInterval {
groupMutex.getOrPut(changes.groupId) { Mutex() }.withLock {
pushGroupConfigsChangesIfNeeded(changes.groupId)
}
}
} catch (e: Exception) {
Log.e(TAG, "Failed to push group configs", e)
} }
} }
} }
ConfigUpdateNotification.UserConfigs -> launch { ConfigUpdateNotification.UserConfigs -> launch {
userMutex.withLock { try {
pushUserConfigChangesIfNeeded() retryWithUniformInterval {
userMutex.withLock {
pushUserConfigChangesIfNeeded()
}
}
} catch (e: Exception) {
Log.e(TAG, "Failed to push user configs", e)
} }
} }
} }
} catch (e: Exception) {
Log.e(TAG, "Error handling config update", e)
} }
}
} }
} }
private suspend fun pushGroupConfigsChangesIfNeeded(groupId: AccountId) = coroutineScope { private suspend fun pushGroupConfigsChangesIfNeeded(groupId: AccountId) = coroutineScope {
@ -241,11 +249,17 @@ class ConfigSyncHandler @Inject constructor(
val pushTasks = pushes.map { (configType, configPush) -> val pushTasks = pushes.map { (configType, configPush) ->
async { async {
(configType to configPush) to pushConfig(userAuth, snode, configPush, configType.namespace) (configType to configPush) to pushConfig(
userAuth,
snode,
configPush,
configType.namespace
)
} }
} }
val pushResults = pushTasks.awaitAll().associate { it.first.first to (it.first.second to it.second) } val pushResults =
pushTasks.awaitAll().associate { it.first.first to (it.first.second to it.second) }
Log.d(TAG, "Pushed ${pushResults.size} user configs") Log.d(TAG, "Pushed ${pushResults.size} user configs")

View File

@ -3,7 +3,6 @@ package org.session.libsession.messaging.groups
import org.session.libsession.messaging.contacts.Contact import org.session.libsession.messaging.contacts.Contact
import org.session.libsession.messaging.messages.control.GroupUpdated import org.session.libsession.messaging.messages.control.GroupUpdated
import org.session.libsession.utilities.recipients.Recipient import org.session.libsession.utilities.recipients.Recipient
import org.session.libsignal.protos.SignalServiceProtos
import org.session.libsignal.protos.SignalServiceProtos.DataMessage.GroupUpdateDeleteMemberContentMessage import org.session.libsignal.protos.SignalServiceProtos.DataMessage.GroupUpdateDeleteMemberContentMessage
import org.session.libsignal.utilities.AccountId import org.session.libsignal.utilities.AccountId
@ -36,7 +35,7 @@ interface GroupManagerV2 {
suspend fun promoteMember(group: AccountId, members: List<AccountId>) suspend fun promoteMember(group: AccountId, members: List<AccountId>)
suspend fun onReceiveInvitation( suspend fun handleInvitation(
groupId: AccountId, groupId: AccountId,
groupName: String, groupName: String,
authData: ByteArray, authData: ByteArray,
@ -44,7 +43,7 @@ interface GroupManagerV2 {
inviteMessageHash: String? inviteMessageHash: String?
) )
suspend fun onReceivePromotion( suspend fun handlePromotion(
groupId: AccountId, groupId: AccountId,
groupName: String, groupName: String,
adminKey: ByteArray, adminKey: ByteArray,

View File

@ -94,7 +94,7 @@ class MessageSendJob(val message: Message, val destination: Destination) : Job {
} }
private fun handleFailure(dispatcherName: String, error: Exception) { private fun handleFailure(dispatcherName: String, error: Exception) {
Log.w(TAG, "Failed to send $message::class.simpleName.") Log.w(TAG, "Failed to send $message::class.simpleName.", error)
val message = message as? VisibleMessage val message = message as? VisibleMessage
if (message != null) { if (message != null) {
if (!MessagingModuleConfiguration.shared.messageDataProvider.isOutgoingMessage(message.sentTimestamp!!)) { if (!MessagingModuleConfiguration.shared.messageDataProvider.isOutgoingMessage(message.sentTimestamp!!)) {

View File

@ -4,7 +4,9 @@ import org.session.libsignal.protos.SignalServiceProtos.Content
import org.session.libsignal.protos.SignalServiceProtos.DataMessage import org.session.libsignal.protos.SignalServiceProtos.DataMessage
import org.session.libsignal.protos.SignalServiceProtos.DataMessage.GroupUpdateMessage import org.session.libsignal.protos.SignalServiceProtos.DataMessage.GroupUpdateMessage
class GroupUpdated(val inner: GroupUpdateMessage): ControlMessage() { class GroupUpdated @JvmOverloads constructor(
val inner: GroupUpdateMessage = GroupUpdateMessage.getDefaultInstance()
): ControlMessage() {
override fun isValid(): Boolean { override fun isValid(): Boolean {
return true // TODO: add the validation here return true // TODO: add the validation here

View File

@ -6,7 +6,6 @@ import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import network.loki.messenger.libsession_util.util.ExpiryMode import network.loki.messenger.libsession_util.util.ExpiryMode
import network.loki.messenger.libsession_util.util.Sodium import network.loki.messenger.libsession_util.util.Sodium
import network.loki.messenger.libsession_util.util.afterSend
import org.session.libsession.avatars.AvatarHelper import org.session.libsession.avatars.AvatarHelper
import org.session.libsession.database.userAuth import org.session.libsession.database.userAuth
import org.session.libsession.messaging.MessagingModuleConfiguration import org.session.libsession.messaging.MessagingModuleConfiguration
@ -660,7 +659,7 @@ private fun handlePromotionMessage(message: GroupUpdated) {
GlobalScope.launch { GlobalScope.launch {
try { try {
MessagingModuleConfiguration.shared.groupManagerV2 MessagingModuleConfiguration.shared.groupManagerV2
.onReceivePromotion( .handlePromotion(
groupId = AccountId(IdPrefix.GROUP, keyPair.pubKey), groupId = AccountId(IdPrefix.GROUP, keyPair.pubKey),
groupName = promotion.name, groupName = promotion.name,
adminKey = keyPair.secretKey, adminKey = keyPair.secretKey,
@ -703,7 +702,7 @@ private fun MessageReceiver.handleNewLibSessionClosedGroupMessage(message: Group
GlobalScope.launch { GlobalScope.launch {
try { try {
MessagingModuleConfiguration.shared.groupManagerV2 MessagingModuleConfiguration.shared.groupManagerV2
.onReceiveInvitation( .handleInvitation(
groupId = groupId, groupId = groupId,
groupName = invite.name, groupName = invite.name,
authData = invite.memberAuthData.toByteArray(), authData = invite.memberAuthData.toByteArray(),

View File

@ -42,6 +42,7 @@ class ClosedGroupPoller(
) { ) {
companion object { companion object {
private const val POLL_INTERVAL = 3_000L private const val POLL_INTERVAL = 3_000L
private const val POLL_ERROR_RETRY_DELAY = 10_000L
private const val TAG = "ClosedGroupPoller" private const val TAG = "ClosedGroupPoller"
} }
@ -54,34 +55,43 @@ class ClosedGroupPoller(
Log.d(TAG, "Starting closed group poller for ${closedGroupSessionId.hexString.take(4)}") Log.d(TAG, "Starting closed group poller for ${closedGroupSessionId.hexString.take(4)}")
job?.cancel() job?.cancel()
job = scope.launch(executor) { job = scope.launch(executor) {
var snode: Snode? = null
while (isActive) { while (isActive) {
configFactoryProtocol.getClosedGroup(closedGroupSessionId) ?: break try {
val swarmNodes = SnodeAPI.getSwarm(closedGroupSessionId.hexString).await().toMutableSet()
var currentSnode: Snode? = null
if (snode == null) { while (isActive) {
Log.i(TAG, "No Snode, fetching one") if (currentSnode == null) {
snode = SnodeAPI.getSingleTargetSnode(closedGroupSessionId.hexString).await() check(swarmNodes.isNotEmpty()) { "No swarm nodes found" }
} Log.d(TAG, "No current snode, getting a new one. Remaining in pool = ${swarmNodes.size - 1}")
currentSnode = swarmNodes.random()
swarmNodes.remove(currentSnode)
}
val nextPoll = runCatching { poll(snode!!) } val result = runCatching { poll(currentSnode!!) }
when { when {
nextPoll.isFailure -> { result.isSuccess -> {
Log.e(TAG, "Error polling closed group", nextPoll.exceptionOrNull()) delay(POLL_INTERVAL)
// Clearing snode so we get a new one next time }
snode = null
delay(POLL_INTERVAL) result.isFailure -> {
} val error = result.exceptionOrNull()!!
if (error is CancellationException) {
nextPoll.getOrNull() == null -> { throw error
// assume null poll time means don't continue polling, either the group has been deleted or something else }
Log.d(TAG, "Stopping the closed group poller")
break Log.e(TAG, "Error polling closed group", error)
} // Clearing snode so we get a new one next time
currentSnode = null
else -> { delay(POLL_INTERVAL)
delay(POLL_INTERVAL) }
}
} }
} catch (e: CancellationException) {
throw e
} catch (e: Exception) {
Log.e(TAG, "Error during group poller", e)
delay(POLL_ERROR_RETRY_DELAY)
} }
} }
} }

View File

@ -296,22 +296,22 @@ object OnionRequestAPI {
is Destination.Snode -> destination.snode is Destination.Snode -> destination.snode
is Destination.Server -> null is Destination.Server -> null
} }
return getPath(snodeToExclude).bind { path -> return getPath(snodeToExclude).map { path ->
guardSnode = path.first() guardSnode = path.first()
// Encrypt in reverse order, i.e. the destination first // Encrypt in reverse order, i.e. the destination first
OnionRequestEncryption.encryptPayloadForDestination(payload, destination, version).bind { r -> OnionRequestEncryption.encryptPayloadForDestination(payload, destination, version).let { r ->
destinationSymmetricKey = r.symmetricKey destinationSymmetricKey = r.symmetricKey
// Recursively encrypt the layers of the onion (again in reverse order) // Recursively encrypt the layers of the onion (again in reverse order)
encryptionResult = r encryptionResult = r
@Suppress("NAME_SHADOWING") var path = path @Suppress("NAME_SHADOWING") var path = path
var rhs = destination var rhs = destination
fun addLayer(): Promise<EncryptionResult, Exception> { fun addLayer(): EncryptionResult {
return if (path.isEmpty()) { return if (path.isEmpty()) {
Promise.of(encryptionResult) encryptionResult
} else { } else {
val lhs = Destination.Snode(path.last()) val lhs = Destination.Snode(path.last())
path = path.dropLast(1) path = path.dropLast(1)
OnionRequestEncryption.encryptHop(lhs, rhs, encryptionResult).bind { r -> OnionRequestEncryption.encryptHop(lhs, rhs, encryptionResult).let { r ->
encryptionResult = r encryptionResult = r
rhs = lhs rhs = lhs
addLayer() addLayer()

View File

@ -38,57 +38,53 @@ object OnionRequestEncryption {
payload: ByteArray, payload: ByteArray,
destination: Destination, destination: Destination,
version: Version version: Version
): Promise<EncryptionResult, Exception> { ): EncryptionResult {
return GlobalScope.asyncPromise { val plaintext = if (version == Version.V4) {
val plaintext = if (version == Version.V4) { payload
payload } else {
} else { // Wrapping isn't needed for file server or open group onion requests
// Wrapping isn't needed for file server or open group onion requests when (destination) {
when (destination) { is Destination.Snode -> encode(payload, mapOf("headers" to ""))
is Destination.Snode -> encode(payload, mapOf("headers" to "")) is Destination.Server -> payload
is Destination.Server -> payload
}
} }
val x25519PublicKey = when (destination) {
is Destination.Snode -> destination.snode.publicKeySet!!.x25519Key
is Destination.Server -> destination.x25519PublicKey
}
AESGCM.encrypt(plaintext, x25519PublicKey)
} }
val x25519PublicKey = when (destination) {
is Destination.Snode -> destination.snode.publicKeySet!!.x25519Key
is Destination.Server -> destination.x25519PublicKey
}
return AESGCM.encrypt(plaintext, x25519PublicKey)
} }
/** /**
* Encrypts the previous encryption result (i.e. that of the hop after this one) for this hop. Use this to build the layers of an onion request. * Encrypts the previous encryption result (i.e. that of the hop after this one) for this hop. Use this to build the layers of an onion request.
*/ */
internal fun encryptHop(lhs: Destination, rhs: Destination, previousEncryptionResult: EncryptionResult): Promise<EncryptionResult, Exception> { internal fun encryptHop(lhs: Destination, rhs: Destination, previousEncryptionResult: EncryptionResult): EncryptionResult {
return GlobalScope.asyncPromise { val payload: MutableMap<String, Any> = when (rhs) {
val payload: MutableMap<String, Any> = when (rhs) { is Destination.Snode -> {
is Destination.Snode -> { mutableMapOf("destination" to rhs.snode.publicKeySet!!.ed25519Key)
mutableMapOf("destination" to rhs.snode.publicKeySet!!.ed25519Key)
}
is Destination.Server -> {
mutableMapOf(
"host" to rhs.host,
"target" to rhs.target,
"method" to "POST",
"protocol" to rhs.scheme,
"port" to rhs.port
)
}
} }
payload["ephemeral_key"] = previousEncryptionResult.ephemeralPublicKey.toHexString()
val x25519PublicKey = when (lhs) {
is Destination.Snode -> {
lhs.snode.publicKeySet!!.x25519Key
}
is Destination.Server -> { is Destination.Server -> {
lhs.x25519PublicKey mutableMapOf(
} "host" to rhs.host,
"target" to rhs.target,
"method" to "POST",
"protocol" to rhs.scheme,
"port" to rhs.port
)
} }
val plaintext = encode(previousEncryptionResult.ciphertext, payload)
AESGCM.encrypt(plaintext, x25519PublicKey)
} }
payload["ephemeral_key"] = previousEncryptionResult.ephemeralPublicKey.toHexString()
val x25519PublicKey = when (lhs) {
is Destination.Snode -> {
lhs.snode.publicKeySet!!.x25519Key
}
is Destination.Server -> {
lhs.x25519PublicKey
}
}
val plaintext = encode(previousEncryptionResult.ciphertext, payload)
return AESGCM.encrypt(plaintext, x25519PublicKey)
} }
} }

View File

@ -14,7 +14,7 @@ import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException import kotlin.coroutines.resumeWithException
import kotlin.coroutines.suspendCoroutine import kotlin.coroutines.suspendCoroutine
suspend fun <T, E: Throwable> Promise<T, E>.await(): T { suspend inline fun <T, E: Throwable> Promise<T, E>.await(): T {
return suspendCoroutine { cont -> return suspendCoroutine { cont ->
success(cont::resume) success(cont::resume)
fail(cont::resumeWithException) fail(cont::resumeWithException)

View File

@ -31,7 +31,11 @@ interface ConfigFactoryProtocol {
fun mergeUserConfigs(userConfigType: UserConfigType, messages: List<ConfigMessage>) fun mergeUserConfigs(userConfigType: UserConfigType, messages: List<ConfigMessage>)
fun <T> withGroupConfigs(groupId: AccountId, cb: (GroupConfigs) -> T): T fun <T> withGroupConfigs(groupId: AccountId, cb: (GroupConfigs) -> T): T
fun <T> withMutableGroupConfigs(groupId: AccountId, cb: (MutableGroupConfigs) -> T): T
/**
* @param recreateConfigInstances If true, the group configs will be recreated before calling the callback. This is useful when you have received an admin key or otherwise.
*/
fun <T> withMutableGroupConfigs(groupId: AccountId, recreateConfigInstances: Boolean = false, cb: (MutableGroupConfigs) -> T): T
fun conversationInConfig(publicKey: String?, groupPublicKey: String?, openGroupId: String?, visibleOnly: Boolean): Boolean fun conversationInConfig(publicKey: String?, groupPublicKey: String?, openGroupId: String?, visibleOnly: Boolean): Boolean
fun canPerformChange(variant: String, publicKey: String, changeTimestampMs: Long): Boolean fun canPerformChange(variant: String, publicKey: String, changeTimestampMs: Long): Boolean