Clock management and kicked

This commit is contained in:
SessionHero01 2024-10-03 12:15:51 +10:00
parent 1f5fde0d9a
commit a5c89d8d5a
No known key found for this signature in database
17 changed files with 429 additions and 290 deletions

View File

@ -48,6 +48,7 @@ import org.session.libsession.messaging.notifications.TokenFetcher;
import org.session.libsession.messaging.sending_receiving.notifications.MessageNotifier;
import org.session.libsession.messaging.sending_receiving.pollers.LegacyClosedGroupPollerV2;
import org.session.libsession.messaging.sending_receiving.pollers.Poller;
import org.session.libsession.snode.SnodeClock;
import org.session.libsession.snode.SnodeModule;
import org.session.libsession.utilities.Address;
import org.session.libsession.utilities.Device;
@ -165,6 +166,7 @@ public class ApplicationContext extends Application implements DefaultLifecycleO
MessagingModuleConfiguration messagingModuleConfiguration;
@Inject ConfigSyncHandler configSyncHandler;
@Inject RemoveGroupMemberHandler removeGroupMemberHandler;
@Inject SnodeClock snodeClock;
private volatile boolean isAppVisible;
@ -236,7 +238,8 @@ public class ApplicationContext extends Application implements DefaultLifecycleO
lastSentTimestampCache,
this,
tokenFetcher,
groupManagerV2
groupManagerV2,
snodeClock
);
callMessageProcessor = new CallMessageProcessor(this, textSecurePreferences, ProcessLifecycleOwner.get().getLifecycle(), storage);
Log.i(TAG, "onCreate()");
@ -270,6 +273,7 @@ public class ApplicationContext extends Application implements DefaultLifecycleO
pushRegistrationHandler.run();
configSyncHandler.start();
removeGroupMemberHandler.start();
snodeClock.start();
// add our shortcut debug menu if we are not in a release build
if (BuildConfig.BUILD_TYPE != "release") {

View File

@ -327,8 +327,6 @@ class ConfigFactory @Inject constructor(
cb(configs as GroupConfigsImpl)
}
Log.d("ConfigFactory", "Group updated? $groupId: $changed")
if (changed) {
if (!_configUpdateNotifications.tryEmit(ConfigUpdateNotification.GroupConfigsUpdated(groupId))) {
Log.e("ConfigFactory", "Unable to deliver group update notification")
@ -351,6 +349,7 @@ class ConfigFactory @Inject constructor(
override fun removeGroup(groupId: AccountId) {
withMutableUserConfigs {
it.userGroups.eraseClosedGroup(groupId.hexString)
it.convoInfoVolatile.eraseClosedGroup(groupId.hexString)
}
if (groupConfigs.remove(groupId) != null) {

View File

@ -14,6 +14,7 @@ import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.GlobalScope
import org.session.libsession.database.StorageProtocol
import org.session.libsession.messaging.groups.GroupManagerV2
import org.session.libsession.snode.SnodeClock
import org.session.libsession.utilities.ConfigFactoryProtocol
import org.session.libsignal.database.LokiAPIDatabaseProtocol
import org.thoughtcrime.securesms.database.ConfigDatabase
@ -52,4 +53,8 @@ object SessionUtilModule {
storage = storage,
lokiApiDatabase = lokiApiDatabase,
)
@Provides
@Singleton
fun provideSnodeClock() = SnodeClock()
}

View File

@ -8,6 +8,7 @@ import dagger.assisted.AssistedInject
import dagger.hilt.android.lifecycle.HiltViewModel
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.async
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.SharingStarted
@ -242,14 +243,13 @@ class EditGroupViewModel @AssistedInject constructor(
mutableInProgress.value = true
// We need to use GlobalScope here because we don't want
// "removeMember" to be cancelled when the view model is cleared. This operation
// is expected to complete even if the view model is cleared.
val task = GlobalScope.launch {
// any group operation to be cancelled when the view model is cleared.
val task = GlobalScope.async {
operation()
}
try {
task.join()
task.await()
} catch (e: Exception) {
mutableError.value = e.localizedMessage.orEmpty()
} finally {

View File

@ -3,7 +3,10 @@ package org.thoughtcrime.securesms.groups
import android.content.Context
import com.google.protobuf.ByteString
import dagger.hilt.android.qualifiers.ApplicationContext
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.async
import kotlinx.coroutines.withContext
import network.loki.messenger.libsession_util.ConfigBase.Companion.PRIORITY_VISIBLE
@ -12,7 +15,6 @@ import network.loki.messenger.libsession_util.util.GroupInfo
import network.loki.messenger.libsession_util.util.GroupMember
import network.loki.messenger.libsession_util.util.INVITE_STATUS_FAILED
import network.loki.messenger.libsession_util.util.INVITE_STATUS_SENT
import network.loki.messenger.libsession_util.util.Sodium
import network.loki.messenger.libsession_util.util.UserPic
import org.session.libsession.database.StorageProtocol
import org.session.libsession.database.userAuth
@ -38,6 +40,7 @@ import org.session.libsession.utilities.Address
import org.session.libsession.utilities.SSKEnvironment
import org.session.libsession.utilities.getClosedGroup
import org.session.libsession.utilities.recipients.Recipient
import org.session.libsession.utilities.waitUntilGroupConfigsPushed
import org.session.libsignal.messages.SignalServiceGroup
import org.session.libsignal.protos.SignalServiceProtos.DataMessage
import org.session.libsignal.protos.SignalServiceProtos.DataMessage.GroupUpdateDeleteMemberContentMessage
@ -76,10 +79,11 @@ class GroupManagerV2Impl @Inject constructor(
* @throws IllegalArgumentException if the group does not exist or no admin key is found.
*/
private fun requireAdminAccess(group: AccountId): ByteArray {
return checkNotNull(configFactory
.withUserConfigs { it.userGroups.getClosedGroup(group.hexString) }
?.adminKey
?.takeIf { it.isNotEmpty() }) { "Only admin is allowed to invite members" }
return checkNotNull(
configFactory.getClosedGroup(group)
?.adminKey
?.takeIf { it.isNotEmpty() }
) { "Only admin is allowed to invite members" }
}
override suspend fun createGroup(
@ -95,7 +99,9 @@ class GroupManagerV2Impl @Inject constructor(
// Create a group in the user groups config
val group = configFactory.withMutableUserConfigs { configs ->
configs.userGroups.createGroup().also(configs.userGroups::set)
configs.userGroups.createGroup()
.copy(name = groupName)
.also(configs.userGroups::set)
}
checkNotNull(group.adminKey) { "Admin key is null for new group creation." }
@ -133,6 +139,10 @@ class GroupManagerV2Impl @Inject constructor(
configs.rekey()
}
if (!configFactory.waitUntilGroupConfigsPushed(groupId)) {
Log.w(TAG, "Unable to push group configs in a timely manner")
}
configFactory.withMutableUserConfigs {
it.convoInfoVolatile.set(
Conversation.ClosedGroup(
@ -281,77 +291,105 @@ class GroupManagerV2Impl @Inject constructor(
removedMembers: List<AccountId>,
removeMessages: Boolean
) {
doRemoveMembers(
flagMembersForRemoval(
group = groupAccountId,
removedMembers = removedMembers,
sendRemovedMessage = true,
removeMemberMessages = removeMessages
members = removedMembers,
alsoRemoveMembersMessage = removeMessages,
sendMemberChangeMessage = true
)
}
override suspend fun handleMemberLeft(message: GroupUpdated, closedGroupId: AccountId) {
val closedGroupHexString = closedGroupId.hexString
val closedGroup =
configFactory.withUserConfigs { it.userGroups.getClosedGroup(closedGroupId.hexString) }
?: return
override suspend fun removeMemberMessages(
groupAccountId: AccountId,
members: List<AccountId>
): Unit = withContext(dispatcher) {
val messagesToDelete = mutableListOf<String>()
val threadId = storage.getThreadId(Address.fromSerialized(groupAccountId.hexString))
if (threadId != null) {
for (member in members) {
for (msg in mmsSmsDatabase.getUserMessages(threadId, member.hexString)) {
val serverHash = lokiDatabase.getMessageServerHash(msg.id, msg.isMms)
if (serverHash != null) {
messagesToDelete.add(serverHash)
}
}
storage.deleteMessagesByUser(threadId, member.hexString)
}
}
if (messagesToDelete.isEmpty()) {
return@withContext
}
val groupAdminAuth = configFactory.getClosedGroup(groupAccountId)?.adminKey?.let {
OwnedSwarmAuth.ofClosedGroup(groupAccountId, it)
} ?: return@withContext
SnodeAPI.deleteMessage(groupAccountId.hexString, groupAdminAuth, messagesToDelete).await()
}
override suspend fun handleMemberLeft(message: GroupUpdated, group: AccountId) {
val closedGroup = configFactory.getClosedGroup(group) ?: return
if (closedGroup.hasAdminKey()) {
// re-key and do a new config removing the previous member
doRemoveMembers(
closedGroupId,
listOf(AccountId(message.sender!!)),
sendRemovedMessage = false,
removeMemberMessages = false
flagMembersForRemoval(
group = group,
members = listOf(AccountId(message.sender!!)),
alsoRemoveMembersMessage = false,
sendMemberChangeMessage = false
)
} else {
val hasAnyAdminRemaining = configFactory.withGroupConfigs(closedGroupId) { configs ->
val hasAnyAdminRemaining = configFactory.withGroupConfigs(group) { configs ->
configs.groupMembers.all()
.asSequence()
.filterNot { it.sessionId == message.sender }
.any { it.admin && !it.removed }
}
// if the leaving member is an admin, disable the group and remove it
// This is just to emulate the "existing" group behaviour, this will need to be removed in future
// if the leaving member is last admin, disable the group and remove it
// This is just to emulate the "existing" group behaviour, this will probably be removed in future
if (!hasAnyAdminRemaining) {
pollerFactory.pollerFor(closedGroupId)?.stop()
storage.getThreadId(Address.fromSerialized(closedGroupHexString))
pollerFactory.pollerFor(group)?.stop()
storage.getThreadId(Address.fromSerialized(group.hexString))
?.let(storage::deleteConversation)
configFactory.removeGroup(closedGroupId)
configFactory.removeGroup(group)
}
}
}
override suspend fun leaveGroup(group: AccountId, deleteOnLeave: Boolean) {
val canSendGroupMessage =
configFactory.withUserConfigs { it.userGroups.getClosedGroup(group.hexString) }?.kicked != true
val address = Address.fromSerialized(group.hexString)
val canSendGroupMessage = configFactory.getClosedGroup(group)?.kicked == false
if (canSendGroupMessage) {
MessageSender.sendNonDurably(
message = GroupUpdated(
val destination = Destination.ClosedGroup(group.hexString)
MessageSender.send(
GroupUpdated(
GroupUpdateMessage.newBuilder()
.setMemberLeftMessage(DataMessage.GroupUpdateMemberLeftMessage.getDefaultInstance())
.build()
),
address = address,
destination,
isSyncMessage = false
).await()
MessageSender.sendNonDurably(
message = GroupUpdated(
MessageSender.send(
GroupUpdated(
GroupUpdateMessage.newBuilder()
.setMemberLeftNotificationMessage(DataMessage.GroupUpdateMemberLeftNotificationMessage.getDefaultInstance())
.build()
),
address = address,
destination,
isSyncMessage = false
).await()
}
pollerFactory.pollerFor(group)?.stop()
// TODO: set "deleted" and post to -10 group namespace?
if (deleteOnLeave) {
storage.getThreadId(address)?.let(storage::deleteConversation)
storage.getThreadId(Address.fromSerialized(group.hexString))
?.let(storage::deleteConversation)
configFactory.removeGroup(group)
}
}
@ -359,25 +397,25 @@ class GroupManagerV2Impl @Inject constructor(
override suspend fun promoteMember(
group: AccountId,
members: List<AccountId>
): Unit = withContext(dispatcher) {
): Unit = withContext(dispatcher + SupervisorJob()) {
val adminKey = requireAdminAccess(group)
val groupName = configFactory.withGroupConfigs(group) { it.groupInfo.getName() }
// Send out the promote message to the members concurrently
val promoteMessage = GroupUpdated(
GroupUpdateMessage.newBuilder()
.setPromoteMessage(
DataMessage.GroupUpdatePromoteMessage.newBuilder()
.setGroupIdentitySeed(ByteString.copyFrom(adminKey))
.setName(groupName)
)
.build()
)
val promotionDeferred = members.associateWith { member ->
async {
val message = GroupUpdated(
GroupUpdateMessage.newBuilder()
.setPromoteMessage(
DataMessage.GroupUpdatePromoteMessage.newBuilder()
.setGroupIdentitySeed(ByteString.copyFrom(adminKey))
.setName(groupName)
)
.build()
)
MessageSender.sendNonDurably(
message = message,
message = promoteMessage,
address = Address.fromSerialized(member.hexString),
isSyncMessage = false
).await()
@ -428,125 +466,25 @@ class GroupManagerV2Impl @Inject constructor(
storage.insertGroupInfoChange(message, group)
}
private suspend fun doRemoveMembers(
group: AccountId,
removedMembers: List<AccountId>,
sendRemovedMessage: Boolean,
removeMemberMessages: Boolean
) = withContext(dispatcher) {
private suspend fun flagMembersForRemoval(
group: AccountId, members: List<AccountId>,
alsoRemoveMembersMessage: Boolean,
sendMemberChangeMessage: Boolean
) {
val adminKey = requireAdminAccess(group)
val groupAuth = OwnedSwarmAuth.ofClosedGroup(group, adminKey)
// To remove a member from a group, we need to first:
// 1. Notify the swarm that this member's key has bene revoked
// 2. Send a "kicked" message to a special namespace that the kicked member can still read
// 3. Optionally, send "delete member messages" to the group. (So that every device in the group
// delete this member's messages locally.)
// These three steps will be included in a sequential call as they all need to be done in order.
// After these steps are all done, we will do the following:
// Update the group configs to remove the member, sync if needed, then
// delete the member's messages locally and remotely.
val essentialRequests = configFactory.withGroupConfigs(group) { configs ->
val messageSendTimestamp = SnodeAPI.nowWithOffset
buildList {
this += SnodeAPI.buildAuthenticatedRevokeSubKeyBatchRequest(
groupAdminAuth = groupAuth,
subAccountTokens = removedMembers.map(configs.groupKeys::getSubAccountToken)
)
this += Sodium.encryptForMultipleSimple(
messages = removedMembers.map { "${it.hexString}-${configs.groupKeys.currentGeneration()}".encodeToByteArray() }
.toTypedArray(),
recipients = removedMembers.map { it.pubKeyBytes }.toTypedArray(),
ed25519SecretKey = adminKey,
domain = Sodium.KICKED_DOMAIN
).let { encryptedForMembers ->
SnodeAPI.buildAuthenticatedStoreBatchInfo(
namespace = Namespace.REVOKED_GROUP_MESSAGES(),
message = SnodeMessage(
recipient = group.hexString,
data = Base64.encodeBytes(encryptedForMembers),
ttl = SnodeMessage.CONFIG_TTL,
timestamp = messageSendTimestamp
),
auth = groupAuth
)
}
if (removeMemberMessages) {
val adminSignature =
SodiumUtilities.sign(
buildDeleteMemberContentSignature(
memberIds = removedMembers,
messageHashes = emptyList(),
timestamp = messageSendTimestamp
), adminKey
)
this += SnodeAPI.buildAuthenticatedStoreBatchInfo(
namespace = Namespace.CLOSED_GROUP_MESSAGES(),
message = MessageSender.buildWrappedMessageToSnode(
destination = Destination.ClosedGroup(group.hexString),
message = GroupUpdated(
GroupUpdateMessage.newBuilder()
.setDeleteMemberContent(
GroupUpdateDeleteMemberContentMessage.newBuilder()
.addAllMemberSessionIds(removedMembers.map { it.hexString })
.setAdminSignature(ByteString.copyFrom(adminSignature))
)
.build()
).apply { sentTimestamp = messageSendTimestamp },
isSyncMessage = false
),
auth = groupAuth
)
}
}
}
val snode = SnodeAPI.getSingleTargetSnode(group.hexString).await()
val responses = SnodeAPI.getBatchResponse(
snode,
group.hexString,
essentialRequests,
sequence = true
)
responses.requireAllRequestsSuccessful("Failed to execute essential steps for removing member")
// Next step: update group configs, rekey, remove member messages if required
// 1. Mark the members as removed in the group configs
configFactory.withMutableGroupConfigs(group) { configs ->
removedMembers.forEach { configs.groupMembers.erase(it.hexString) }
configs.rekey()
}
if (removeMemberMessages) {
val threadId = storage.getThreadId(Address.fromSerialized(group.hexString))
if (threadId != null) {
val messagesToDelete = mutableListOf<String>()
for (member in removedMembers) {
for (msg in mmsSmsDatabase.getUserMessages(threadId, member.hexString)) {
val serverHash = lokiDatabase.getMessageServerHash(msg.id, msg.isMms)
if (serverHash != null) {
messagesToDelete.add(serverHash)
}
}
storage.deleteMessagesByUser(threadId, member.hexString)
for (member in members) {
val memberConfig = configs.groupMembers.get(member.hexString)
if (memberConfig != null) {
configs.groupMembers.set(memberConfig.setRemoved(alsoRemoveMembersMessage))
}
SnodeAPI.sendBatchRequest(
snode, group.hexString, SnodeAPI.buildAuthenticatedDeleteBatchInfo(
groupAuth,
messagesToDelete
)
)
}
}
if (sendRemovedMessage) {
// 2. Send a member change message
if (sendMemberChangeMessage) {
val timestamp = SnodeAPI.nowWithOffset
val signature = SodiumUtilities.sign(
buildMemberChangeSignature(
@ -559,7 +497,7 @@ class GroupManagerV2Impl @Inject constructor(
val updateMessage = GroupUpdateMessage.newBuilder()
.setMemberChangeMessage(
GroupUpdateMemberChangeMessage.newBuilder()
.addAllMemberSessionIds(removedMembers.map { it.hexString })
.addAllMemberSessionIds(members.map { it.hexString })
.setType(GroupUpdateMemberChangeMessage.Type.REMOVED)
.setAdminSignature(ByteString.copyFrom(signature))
)
@ -567,8 +505,8 @@ class GroupManagerV2Impl @Inject constructor(
val message = GroupUpdated(
updateMessage
).apply { sentTimestamp = timestamp }
MessageSender.send(message, Destination.ClosedGroup(group.hexString), false)
storage.insertGroupInfoChange(message, group)
MessageSender.send(message, Destination.ClosedGroup(group.hexString), false).await()
}
}
@ -671,8 +609,7 @@ class GroupManagerV2Impl @Inject constructor(
promoteMessageHash: String?
) = withContext(dispatcher) {
val userAuth = requireNotNull(storage.userAuth) { "No current user available" }
val group =
configFactory.withUserConfigs { it.userGroups.getClosedGroup(groupId.hexString) }
val group = configFactory.getClosedGroup(groupId)
if (group == null) {
// If we haven't got the group in the config, it could mean that we haven't
@ -692,12 +629,13 @@ class GroupManagerV2Impl @Inject constructor(
}
// Update our promote state
configFactory.withMutableGroupConfigs(recreateConfigInstances = true, groupId = groupId) { configs ->
configFactory.withMutableGroupConfigs(
recreateConfigInstances = true,
groupId = groupId
) { configs ->
configs.groupMembers.get(userAuth.accountId.hexString)?.let { member ->
configs.groupMembers.set(member.setPromoteSuccess())
}
Unit
}
}
@ -729,7 +667,7 @@ class GroupManagerV2Impl @Inject constructor(
inviter: AccountId,
) {
// If we have already received an invitation in the past, we should not process this one
if (configFactory.withUserConfigs { it.userGroups.getClosedGroup(groupId.hexString) }?.invited == true) {
if (configFactory.getClosedGroup(groupId)?.invited == true) {
return
}
@ -893,8 +831,11 @@ class GroupManagerV2Impl @Inject constructor(
// If we are admin, we can delete the messages from the group swarm
group.adminKey?.let { adminKey ->
SnodeAPI.deleteMessage(groupId.hexString, OwnedSwarmAuth.ofClosedGroup(groupId, adminKey), messageHashes)
.await()
SnodeAPI.deleteMessage(
publicKey = groupId.hexString,
swarmAuth = OwnedSwarmAuth.ofClosedGroup(groupId, adminKey),
serverHashes = messageHashes
).await()
}
// Construct a message to ask members to delete the messages, sign if we are admin, then send
@ -978,8 +919,11 @@ class GroupManagerV2Impl @Inject constructor(
groupId.hexString
)
) {
SnodeAPI.deleteMessage(groupId.hexString, OwnedSwarmAuth.ofClosedGroup(groupId, adminKey), hashes)
.await()
SnodeAPI.deleteMessage(
groupId.hexString,
OwnedSwarmAuth.ofClosedGroup(groupId, adminKey),
hashes
).await()
}
// The non-admin user shouldn't be able to delete other user's messages so we will

View File

@ -1,9 +1,11 @@
package network.loki.messenger.libsession_util.util
import java.util.regex.Pattern
object Sodium {
const val KICKED_DOMAIN = "SessionGroupKickedMessage"
val KICKED_REGEX = Regex("05\\w{64}-\\d+")
val KICKED_REGEX: Pattern = Pattern.compile("^(05[a-zA-Z0-9]{64})(\\d+)$")
init {
System.loadLibrary("session_util")

View File

@ -7,6 +7,7 @@ import org.session.libsession.database.StorageProtocol
import org.session.libsession.messaging.groups.GroupManagerV2
import org.session.libsession.messaging.notifications.TokenFetcher
import org.session.libsession.snode.OwnedSwarmAuth
import org.session.libsession.snode.SnodeClock
import org.session.libsession.utilities.ConfigFactoryProtocol
import org.session.libsession.utilities.Device
import org.session.libsession.utilities.Toaster
@ -22,6 +23,7 @@ class MessagingModuleConfiguration(
val toaster: Toaster,
val tokenFetcher: TokenFetcher,
val groupManagerV2: GroupManagerV2,
val clock: SnodeClock,
) {
companion object {

View File

@ -29,7 +29,21 @@ interface GroupManagerV2 {
removeMessages: Boolean
)
suspend fun handleMemberLeft(message: GroupUpdated, closedGroupId: AccountId)
/**
* Remove all messages from the group for the given members.
*
* This will delete all messages locally, and, if the user is an admin, remotely as well.
*
* Note: unlike [handleDeleteMemberContent], [requestMessageDeletion], this method
* does not try to validate the validity of the request, it also does not ask other members
* to delete the messages. It simply removes what it can.
*/
suspend fun removeMemberMessages(
groupAccountId: AccountId,
members: List<AccountId>
)
suspend fun handleMemberLeft(message: GroupUpdated, group: AccountId)
suspend fun leaveGroup(group: AccountId, deleteOnLeave: Boolean)
@ -59,8 +73,22 @@ interface GroupManagerV2 {
suspend fun setName(groupId: AccountId, newName: String)
/**
* Send a request to the group to delete the given messages.
*
* It can be called by a regular member who wishes to delete their own messages.
* It can also called by an admin, who can delete any messages from any member.
*/
suspend fun requestMessageDeletion(groupId: AccountId, messageHashes: List<String>)
/**
* Handle a request to delete a member's content from the group. This is called when we receive
* a message from the server that a member's content needs to be deleted. (usually sent by
* [requestMessageDeletion], for example)
*
* In contrast to [removeMemberMessages], where it will remove the messages blindly, this method
* will check if the right conditions are met before removing the messages.
*/
suspend fun handleDeleteMemberContent(
groupId: AccountId,
deleteMemberContent: GroupUpdateDeleteMemberContentMessage,

View File

@ -2,10 +2,8 @@ package org.session.libsession.messaging.groups
import android.os.SystemClock
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.async
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.first
@ -23,6 +21,7 @@ import org.session.libsession.snode.SnodeMessage
import org.session.libsession.snode.utilities.await
import org.session.libsession.utilities.ConfigFactoryProtocol
import org.session.libsession.utilities.TextSecurePreferences
import org.session.libsession.utilities.waitUntilGroupConfigsPushed
import org.session.libsignal.protos.SignalServiceProtos
import org.session.libsignal.protos.SignalServiceProtos.DataMessage.GroupUpdateMessage
import org.session.libsignal.utilities.AccountId
@ -35,17 +34,22 @@ private const val TAG = "RemoveGroupMemberHandler"
private const val MIN_PROCESS_INTERVAL_MILLS = 1_000L
/**
* This handler is responsible for processing pending group member removals.
*
* It automatically does so by listening to the config updates changes and checking for any pending removals.
*/
class RemoveGroupMemberHandler @Inject constructor(
private val configFactory: ConfigFactoryProtocol,
private val textSecurePreferences: TextSecurePreferences,
private val groupManager: GroupManagerV2,
) {
private val scope: CoroutineScope = GlobalScope
private var job: Job? = null
fun start() {
require(job == null) { "Already started" }
job = scope.launch {
job = GlobalScope.launch {
while (true) {
// Make sure we have a local number before we start processing
textSecurePreferences.watchLocalNumber().first { it != null }
@ -74,74 +78,54 @@ class RemoveGroupMemberHandler @Inject constructor(
}
private suspend fun processPendingMemberRemoval() {
// Run the removal process for each group in parallel
val removalTasks = configFactory.withUserConfigs { it.userGroups.allClosedGroupInfo() }
configFactory.withUserConfigs { it.userGroups.allClosedGroupInfo() }
.asSequence()
.filter { it.hasAdminKey() }
.associate { group ->
group.name to scope.async {
processPendingRemovalsForGroup(
groupAccountId = group.groupAccountId,
groupName = group.name,
adminKey = group.adminKey!!
)
}
.forEach { group ->
processPendingRemovalsForGroup(group.groupAccountId, group.adminKey!!)
}
// Wait and collect the results of the removal tasks
for ((groupName, task) in removalTasks) {
try {
task.await()
} catch (e: Exception) {
Log.e(TAG, "Error processing pending removals for group $groupName", e)
}
}
}
private suspend fun processPendingRemovalsForGroup(
groupAccountId: AccountId,
groupName: String,
adminKey: ByteArray
) {
val swarmAuth = OwnedSwarmAuth(
accountId = groupAccountId,
ed25519PublicKeyHex = null,
ed25519PrivateKey = adminKey
)
val groupAuth = OwnedSwarmAuth.ofClosedGroup(groupAccountId, adminKey)
val batchCalls = configFactory.withGroupConfigs(groupAccountId) { configs ->
val (pendingRemovals, batchCalls) = configFactory.withGroupConfigs(groupAccountId) { configs ->
val pendingRemovals = configs.groupMembers.all().filter { it.removed }
if (pendingRemovals.isEmpty()) {
// Skip if there are no pending removals
return@withGroupConfigs emptyList()
return@withGroupConfigs pendingRemovals to emptyList()
}
Log.d(TAG, "Processing ${pendingRemovals.size} pending removals for group $groupName")
Log.d(TAG, "Processing ${pendingRemovals.size} pending removals for group")
// Perform a sequential call to group snode to:
// 1. Revoke the member's sub key (by adding the key to a "revoked list" under the hood)
// 2. Send a message to a special namespace to inform the removed members they have been removed
// 3. Conditionally, delete removed-members' messages from the group's message store, if that option is selected by the actioning admin
// 2. Send a message to a special namespace on the group to inform the removed members they have been removed
// 3. Conditionally, send a `GroupUpdateDeleteMemberContent` to the group so the message deletion
// can be performed by everyone in the group.
val calls = ArrayList<SnodeAPI.SnodeBatchRequestInfo>(3)
// Call No 1. Revoke sub-key. This call is crucial and must not fail for the rest of the operation to be successful.
calls += checkNotNull(
SnodeAPI.buildAuthenticatedRevokeSubKeyBatchRequest(
groupAdminAuth = swarmAuth,
groupAdminAuth = groupAuth,
subAccountTokens = pendingRemovals.map {
configs.groupKeys.getSubAccountToken(AccountId(it.sessionId))
}
)
) { "Fail to create a revoke request" }
// Call No 2. Send a message to the removed members
// Call No 2. Send a "kicked" message to the revoked namespace
calls += SnodeAPI.buildAuthenticatedStoreBatchInfo(
namespace = Namespace.REVOKED_GROUP_MESSAGES(),
message = buildGroupKickMessage(groupAccountId.hexString, pendingRemovals, configs.groupKeys, adminKey),
auth = swarmAuth,
auth = groupAuth,
)
// Call No 3. Conditionally remove the message from the group's message store
// Call No 3. Conditionally send the `GroupUpdateDeleteMemberContent`
if (pendingRemovals.any { it.shouldRemoveMessages }) {
calls += SnodeAPI.buildAuthenticatedStoreBatchInfo(
namespace = Namespace.CLOSED_GROUP_MESSAGES(),
@ -152,11 +136,11 @@ class RemoveGroupMemberHandler @Inject constructor(
.filter { it.shouldRemoveMessages }
.map { it.sessionId }
),
auth = swarmAuth,
auth = groupAuth,
)
}
calls
pendingRemovals to (calls as List<SnodeAPI.SnodeBatchRequestInfo>)
}
if (batchCalls.isEmpty()) {
@ -164,9 +148,39 @@ class RemoveGroupMemberHandler @Inject constructor(
}
val node = SnodeAPI.getSingleTargetSnode(groupAccountId.hexString).await()
SnodeAPI.getBatchResponse(node, groupAccountId.hexString, batchCalls, true)
val response = SnodeAPI.getBatchResponse(node, groupAccountId.hexString, batchCalls, sequence = true)
//TODO: Handle message removal
val firstError = response.results.firstOrNull { !it.isSuccessful }
check(firstError == null) {
"Error processing pending removals for group: code = ${firstError?.code}, body = ${firstError?.body}"
}
Log.d(TAG, "Essential steps for group removal are done")
// The essential part of the operation has been successful once we get to this point,
// now we can go ahead and update the configs
configFactory.withMutableGroupConfigs(groupAccountId) { configs ->
pendingRemovals.forEach(configs.groupMembers::erase)
configs.rekey()
}
configFactory.waitUntilGroupConfigsPushed(groupAccountId)
Log.d(TAG, "Group configs updated")
// Try to delete members' message. It's ok to fail as they will be re-tried in different
// cases (a.k.a the GroupUpdateDeleteMemberContent message handling) and could be by different admins.
val deletingMessagesForMembers = pendingRemovals.filter { it.shouldRemoveMessages }
if (deletingMessagesForMembers.isNotEmpty()) {
try {
groupManager.removeMemberMessages(
groupAccountId,
deletingMessagesForMembers.map { AccountId(it.sessionId) }
)
} catch (e: Exception) {
Log.e(TAG, "Error deleting messages for removed members", e)
}
}
}
private fun buildDeleteGroupMemberContentMessage(
@ -211,7 +225,7 @@ class RemoveGroupMemberHandler @Inject constructor(
domain = Sodium.KICKED_DOMAIN
)
),
ttl = SnodeMessage.CONFIG_TTL,
ttl = SnodeMessage.DEFAULT_TTL,
timestamp = SnodeAPI.nowWithOffset
)
}

View File

@ -5,6 +5,7 @@ import org.session.libsession.database.StorageProtocol
import org.session.libsession.messaging.MessagingModuleConfiguration
import org.session.libsession.messaging.messages.control.ExpirationTimerUpdate
import org.session.libsession.messaging.messages.visible.VisibleMessage
import org.session.libsession.snode.SnodeMessage
import org.session.libsignal.protos.SignalServiceProtos
import org.session.libsignal.protos.SignalServiceProtos.Content.ExpirationType
@ -25,7 +26,7 @@ abstract class Message {
open val coerceDisappearAfterSendToRead = false
open val defaultTtl: Long = 14 * 24 * 60 * 60 * 1000
open val defaultTtl: Long = SnodeMessage.DEFAULT_TTL
open val ttl: Long get() = specifiedTtl ?: defaultTtl
open val isSelfSendValid: Boolean = false

View File

@ -55,6 +55,7 @@ import org.session.libsignal.crypto.ecc.DjbECPublicKey
import org.session.libsignal.crypto.ecc.ECKeyPair
import org.session.libsignal.messages.SignalServiceGroup
import org.session.libsignal.protos.SignalServiceProtos
import org.session.libsignal.protos.SignalServiceProtos.DataMessage.GroupUpdateMemberChangeMessage
import org.session.libsignal.protos.SignalServiceProtos.SharedConfigMessage
import org.session.libsignal.utilities.AccountId
import org.session.libsignal.utilities.Base64

View File

@ -5,10 +5,10 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.Job
import kotlinx.coroutines.async
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.supervisorScope
import network.loki.messenger.libsession_util.util.Sodium
import org.session.libsession.database.StorageProtocol
import org.session.libsession.messaging.groups.GroupManagerV2
@ -22,7 +22,6 @@ import org.session.libsession.snode.model.RetrieveMessageResponse
import org.session.libsession.snode.utilities.await
import org.session.libsession.utilities.ConfigFactoryProtocol
import org.session.libsession.utilities.ConfigMessage
import org.session.libsession.utilities.getClosedGroup
import org.session.libsignal.database.LokiAPIDatabaseProtocol
import org.session.libsignal.utilities.AccountId
import org.session.libsignal.utilities.Log
@ -102,9 +101,9 @@ class ClosedGroupPoller(
job = null
}
private suspend fun poll(snode: Snode): Unit = coroutineScope {
private suspend fun poll(snode: Snode): Unit = supervisorScope {
val groupAuth =
configFactoryProtocol.getGroupAuth(closedGroupSessionId) ?: return@coroutineScope
configFactoryProtocol.getGroupAuth(closedGroupSessionId) ?: return@supervisorScope
val configHashesToExtends = configFactoryProtocol.withGroupConfigs(closedGroupSessionId) {
buildSet {
addAll(it.groupKeys.currentHashes())
@ -121,23 +120,21 @@ class ClosedGroupPoller(
val pollingTasks = mutableListOf<Pair<String, Deferred<*>>>()
pollingTasks += "retrieving revoked messages" to async {
handleRevoked(
SnodeAPI.sendBatchRequest(
snode,
closedGroupSessionId.hexString,
SnodeAPI.buildAuthenticatedRetrieveBatchRequest(
lastHash = lokiApiDatabase.getLastMessageHashValue(
snode,
closedGroupSessionId.hexString,
Namespace.REVOKED_GROUP_MESSAGES()
).orEmpty(),
auth = groupAuth,
namespace = Namespace.REVOKED_GROUP_MESSAGES(),
maxSize = null,
),
RetrieveMessageResponse::class.java
)
val receiveRevokeMessage = async {
SnodeAPI.sendBatchRequest(
snode,
closedGroupSessionId.hexString,
SnodeAPI.buildAuthenticatedRetrieveBatchRequest(
lastHash = lokiApiDatabase.getLastMessageHashValue(
snode,
closedGroupSessionId.hexString,
Namespace.REVOKED_GROUP_MESSAGES()
).orEmpty(),
auth = groupAuth,
namespace = Namespace.REVOKED_GROUP_MESSAGES(),
maxSize = null,
),
RetrieveMessageResponse::class.java
)
}
@ -198,18 +195,28 @@ class ClosedGroupPoller(
}
}
// The retrieval of the config and regular messages can be done concurrently,
// The retrieval of the all group messages can be done concurrently,
// however, in order for the messages to be able to be decrypted, the config messages
// must be processed first.
pollingTasks += "polling and handling group config keys and messages" to async {
val (keysMessage, infoMessage, membersMessage) = groupConfigRetrieval.map { it.await() }
saveLastMessageHash(snode, keysMessage, Namespace.ENCRYPTION_KEYS())
saveLastMessageHash(snode, infoMessage, Namespace.CLOSED_GROUP_INFO())
saveLastMessageHash(snode, membersMessage, Namespace.CLOSED_GROUP_MEMBERS())
handleGroupConfigMessages(keysMessage, infoMessage, membersMessage)
val result = runCatching {
val (keysMessage, infoMessage, membersMessage) = groupConfigRetrieval.map { it.await() }
handleGroupConfigMessages(keysMessage, infoMessage, membersMessage)
saveLastMessageHash(snode, keysMessage, Namespace.ENCRYPTION_KEYS())
saveLastMessageHash(snode, infoMessage, Namespace.CLOSED_GROUP_INFO())
saveLastMessageHash(snode, membersMessage, Namespace.CLOSED_GROUP_MEMBERS())
val regularMessages = groupMessageRetrieval.await()
handleMessages(regularMessages, snode)
val regularMessages = groupMessageRetrieval.await()
handleMessages(regularMessages, snode)
}
// Revoke message must be handled regardless, and at the end
val revokedMessages = receiveRevokeMessage.await()
handleRevoked(revokedMessages)
saveLastMessageHash(snode, revokedMessages, Namespace.REVOKED_GROUP_MESSAGES())
// Propagate any prior exceptions
result.getOrThrow()
}
// Wait for all tasks to complete, gather any exceptions happened during polling
@ -254,23 +261,23 @@ class ClosedGroupPoller(
)
if (decoded != null) {
Log.d(TAG, "decoded kick message was for us")
val message = decoded.decodeToString()
if (Sodium.KICKED_REGEX.matches(message)) {
val (sessionId, generation) = message.split("-")
val currentKeysGeneration by lazy {
configFactoryProtocol.withGroupConfigs(closedGroupSessionId) {
it.groupKeys.currentGeneration()
}
val matcher = Sodium.KICKED_REGEX.matcher(message)
if (matcher.matches()) {
val sessionId = matcher.group(1)
val messageGeneration = matcher.group(2)!!.toInt()
val currentKeysGeneration = configFactoryProtocol.withGroupConfigs(closedGroupSessionId) {
it.groupKeys.currentGeneration()
}
if (sessionId == storage.getUserPublicKey() && generation.toInt() >= currentKeysGeneration) {
try {
groupManagerV2.handleKicked(closedGroupSessionId)
} catch (e: Exception) {
Log.e("GroupPoller", "Error handling kicked message: $e")
}
val isForMe = sessionId == storage.getUserPublicKey()
Log.d(TAG, "Received kicked message, for us? ${sessionId == storage.getUserPublicKey()}, message key generation = $messageGeneration, our key generation = $currentKeysGeneration")
if (isForMe && messageGeneration >= currentKeysGeneration) {
groupManagerV2.handleKicked(closedGroupSessionId)
}
} else {
Log.w(TAG, "Received an invalid kicked message")
}
}
}

View File

@ -1,6 +1,5 @@
package org.session.libsession.snode
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
import nl.komponents.kovenant.Deferred
@ -26,12 +25,10 @@ import org.session.libsignal.utilities.HTTP
import org.session.libsignal.utilities.JsonUtil
import org.session.libsignal.utilities.Log
import org.session.libsignal.utilities.Snode
import org.session.libsignal.utilities.ThreadUtils
import org.session.libsignal.utilities.recover
import org.session.libsignal.utilities.toHexString
import java.util.concurrent.atomic.AtomicReference
import kotlin.collections.set
import kotlin.coroutines.EmptyCoroutineContext
private typealias Path = List<Snode>
@ -603,11 +600,7 @@ object OnionRequestAPI {
val bodyAsString = json["body"] as String
JsonUtil.fromJson(bodyAsString, Map::class.java)
}
if (body["t"] != null) {
val timestamp = body["t"] as Long
val offset = timestamp - System.currentTimeMillis()
SnodeAPI.clockOffset = offset
}
if (body.containsKey("hf")) {
@Suppress("UNCHECKED_CAST")
val currentHf = body["hf"] as List<Int>

View File

@ -25,6 +25,7 @@ import nl.komponents.kovenant.functional.bind
import nl.komponents.kovenant.functional.map
import nl.komponents.kovenant.task
import nl.komponents.kovenant.unwrap
import org.session.libsession.messaging.MessagingModuleConfiguration
import org.session.libsession.messaging.utilities.MessageWrapper
import org.session.libsession.messaging.utilities.SodiumUtilities.sodium
import org.session.libsession.snode.model.BatchResponse
@ -47,6 +48,7 @@ import org.session.libsignal.utilities.Log
import org.session.libsignal.utilities.Snode
import org.session.libsignal.utilities.prettifiedDescription
import org.session.libsignal.utilities.retryIfNeeded
import java.util.Date
import java.util.Locale
import kotlin.collections.component1
import kotlin.collections.component2
@ -63,15 +65,11 @@ object SnodeAPI {
internal var snodePool: Set<Snode>
get() = database.getSnodePool()
set(newValue) { database.setSnodePool(newValue) }
/**
* The offset between the user's clock and the Service Node's clock. Used in cases where the
* user's clock is incorrect.
*/
internal var clockOffset = 0L
@Deprecated("Use a dependency injected SnodeClock.currentTimeMills() instead")
@JvmStatic
val nowWithOffset
get() = System.currentTimeMillis() + clockOffset
get() = MessagingModuleConfiguration.shared.clock.currentTimeMills()
internal var forkInfo by observable(database.getForkInfo()) { _, oldValue, newValue ->
if (newValue > oldValue) {
@ -418,7 +416,6 @@ object SnodeAPI {
namespace = namespace,
auth = auth,
verificationData = { ns, t -> "${Snode.Method.SendMessage.rawValue}$ns$t" },
timestamp = message.timestamp
) {
putAll(message.toJSON())
}
@ -785,7 +782,7 @@ object SnodeAPI {
parseRawMessagesResponse(resp, snode, auth.accountId.hexString)
}
private fun getNetworkTime(snode: Snode): Promise<Pair<Snode, Long>, Exception> =
fun getNetworkTime(snode: Snode): Promise<Pair<Snode, Long>, Exception> =
invoke(Snode.Method.Info, snode, emptyMap()).map { rawResponse ->
val timestamp = rawResponse["timestamp"] as? Long ?: -1
snode to timestamp
@ -805,13 +802,15 @@ object SnodeAPI {
"Message sent to ${message.recipient} but authenticated with ${auth.accountId.hexString}"
}
val timestamp = nowWithOffset
buildAuthenticatedParameters(
auth = auth,
namespace = namespace,
verificationData = { ns, t -> "${Snode.Method.SendMessage.rawValue}$ns$t" },
timestamp = message.timestamp
timestamp = timestamp
) {
put("sig_timestamp", message.timestamp)
put("sig_timestamp", timestamp)
putAll(message.toJSON())
}
} else {
@ -921,7 +920,7 @@ object SnodeAPI {
fun deleteAllMessages(auth: SwarmAuth): Promise<Map<String, Boolean>, Exception> =
scope.retrySuspendAsPromise(maxRetryCount) {
val snode = getSingleTargetSnode(auth.accountId.hexString).await()
val (_, timestamp) = getNetworkTime(snode).await()
val timestamp = MessagingModuleConfiguration.shared.clock.waitForNetworkAdjustedTime()
val params = buildAuthenticatedParameters(
auth = auth,

View File

@ -0,0 +1,88 @@
package org.session.libsession.snode
import android.os.SystemClock
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.filterNotNull
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.launch
import org.session.libsession.snode.utilities.await
import org.session.libsignal.utilities.Log
import java.util.Date
/**
* A class that manages the network time by querying the network time from a random snode. The
* primary goal of this class is to provide a time that is not tied to current system time and not
* prone to time changes locally.
*
* Before the first network query is successfully, calling [currentTimeMills] will return the current
* system time.
*/
class SnodeClock() {
private val instantState = MutableStateFlow<Instant?>(null)
private var job: Job? = null
fun start() {
require(job == null) { "Already started" }
job = GlobalScope.launch {
while (true) {
try {
val node = SnodeAPI.getRandomSnode().await()
val requestStarted = SystemClock.uptimeMillis()
var networkTime = SnodeAPI.getNetworkTime(node).await().second
val requestEnded = SystemClock.uptimeMillis()
// Adjust the network time to account for the time it took to make the request
// so that the network time equals to the time when the request was started
networkTime -= (requestEnded - requestStarted) / 2
val inst = Instant(requestStarted, networkTime)
Log.d("SnodeClock", "Network time: ${Date(inst.now())}, system time: ${Date()}")
instantState.value = inst
} catch (e: Exception) {
Log.e("SnodeClock", "Failed to get network time. Retrying in a few seconds", e)
} finally {
// Retry frequently if we haven't got any result before
val delayMills = if (instantState.value == null) {
3_000L
} else {
3600_000L
}
delay(delayMills)
}
}
}
}
/**
* Wait for the network adjusted time to come through.
*/
suspend fun waitForNetworkAdjustedTime(): Long {
return instantState.filterNotNull().first().now()
}
/**
* Get the current time in milliseconds. If the network time is not available yet, this method
* will return the current system time.
*/
fun currentTimeMills(): Long {
return instantState.value?.now() ?: System.currentTimeMillis()
}
private class Instant(
val systemUptime: Long,
val networkTime: Long,
) {
fun now(): Long {
val elapsed = SystemClock.uptimeMillis() - systemUptime
return networkTime + elapsed
}
}
}

View File

@ -32,6 +32,7 @@ data class SnodeMessage(
}
companion object {
const val CONFIG_TTL: Long = 30 * 24 * 60 * 60 * 1000L
const val CONFIG_TTL: Long = 30 * 24 * 60 * 60 * 1000L // 30 days
const val DEFAULT_TTL: Long = 14 * 24 * 60 * 60 * 1000L // 14 days
}
}

View File

@ -1,6 +1,11 @@
package org.session.libsession.utilities
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onStart
import kotlinx.coroutines.withTimeoutOrNull
import network.loki.messenger.libsession_util.MutableConfig
import network.loki.messenger.libsession_util.MutableContacts
import network.loki.messenger.libsession_util.MutableConversationVolatileConfig
@ -94,6 +99,52 @@ fun ConfigFactoryProtocol.getClosedGroup(groupId: AccountId): GroupInfo.ClosedGr
return withUserConfigs { it.userGroups.getClosedGroup(groupId.hexString) }
}
/**
* Wait until all user configs are pushed to the server.
*
* This function is not essential to the pushing of the configs, the config push will schedule
* itself upon changes, so this function is purely observatory.
*
* This function will check the user configs immediately, if nothing needs to be pushed, it will return immediately.
*
* @return True if all user configs are pushed, false if the timeout is reached.
*/
suspend fun ConfigFactoryProtocol.waitUntilUserConfigsPushed(timeoutMills: Long = 10_000L): Boolean {
fun needsPush() = withUserConfigs { configs ->
UserConfigType.entries.any { configs.getConfig(it).needsPush() }
}
return withTimeoutOrNull(timeoutMills){
configUpdateNotifications
.onStart { emit(ConfigUpdateNotification.UserConfigs) } // Trigger the filtering immediately
.filter { it == ConfigUpdateNotification.UserConfigs && !needsPush() }
.first()
} != null
}
/**
* Wait until all configs of given group are pushed to the server.
*
* This function is not essential to the pushing of the configs, the config push will schedule
* itself upon changes, so this function is purely observatory.
*
* This function will check the group configs immediately, if nothing needs to be pushed, it will return immediately.
*
* @return True if all group configs are pushed, false if the timeout is reached.
*/
suspend fun ConfigFactoryProtocol.waitUntilGroupConfigsPushed(groupId: AccountId, timeoutMills: Long = 10_000L): Boolean {
fun needsPush() = withGroupConfigs(groupId) { configs ->
configs.groupInfo.needsPush() || configs.groupMembers.needsPush()
}
return withTimeoutOrNull(timeoutMills) {
configUpdateNotifications
.onStart { emit(ConfigUpdateNotification.GroupConfigsUpdated(groupId)) } // Trigger the filtering immediately
.filter { it == ConfigUpdateNotification.GroupConfigsUpdated(groupId) && !needsPush() }
.first()
} != null
}
interface UserConfigs {
val contacts: ReadableContacts
val userGroups: ReadableUserGroupsConfig