mirror of
https://github.com/oxen-io/session-android.git
synced 2024-11-25 11:05:25 +00:00
Config update details and config syncing issue
This commit is contained in:
parent
db8784f0db
commit
a8839af112
@ -4,18 +4,21 @@ import android.content.Context
|
|||||||
import dagger.hilt.android.qualifiers.ApplicationContext
|
import dagger.hilt.android.qualifiers.ApplicationContext
|
||||||
import kotlinx.coroutines.GlobalScope
|
import kotlinx.coroutines.GlobalScope
|
||||||
import kotlinx.coroutines.Job
|
import kotlinx.coroutines.Job
|
||||||
|
import kotlinx.coroutines.async
|
||||||
|
import kotlinx.coroutines.flow.debounce
|
||||||
|
import kotlinx.coroutines.flow.filterIsInstance
|
||||||
import kotlinx.coroutines.launch
|
import kotlinx.coroutines.launch
|
||||||
import kotlinx.coroutines.sync.Mutex
|
import kotlinx.coroutines.supervisorScope
|
||||||
import kotlinx.coroutines.sync.withLock
|
|
||||||
import network.loki.messenger.libsession_util.ConfigBase.Companion.PRIORITY_HIDDEN
|
import network.loki.messenger.libsession_util.ConfigBase.Companion.PRIORITY_HIDDEN
|
||||||
import network.loki.messenger.libsession_util.ConfigBase.Companion.PRIORITY_PINNED
|
import network.loki.messenger.libsession_util.ConfigBase.Companion.PRIORITY_PINNED
|
||||||
import network.loki.messenger.libsession_util.ReadableContacts
|
|
||||||
import network.loki.messenger.libsession_util.ReadableConversationVolatileConfig
|
|
||||||
import network.loki.messenger.libsession_util.ReadableGroupInfoConfig
|
import network.loki.messenger.libsession_util.ReadableGroupInfoConfig
|
||||||
import network.loki.messenger.libsession_util.ReadableUserGroupsConfig
|
import network.loki.messenger.libsession_util.ReadableUserGroupsConfig
|
||||||
import network.loki.messenger.libsession_util.ReadableUserProfile
|
import network.loki.messenger.libsession_util.ReadableUserProfile
|
||||||
import network.loki.messenger.libsession_util.util.BaseCommunityInfo
|
import network.loki.messenger.libsession_util.util.BaseCommunityInfo
|
||||||
|
import network.loki.messenger.libsession_util.util.Contact
|
||||||
import network.loki.messenger.libsession_util.util.Conversation
|
import network.loki.messenger.libsession_util.util.Conversation
|
||||||
|
import network.loki.messenger.libsession_util.util.ExpiryMode
|
||||||
|
import network.loki.messenger.libsession_util.util.GroupInfo
|
||||||
import network.loki.messenger.libsession_util.util.UserPic
|
import network.loki.messenger.libsession_util.util.UserPic
|
||||||
import network.loki.messenger.libsession_util.util.afterSend
|
import network.loki.messenger.libsession_util.util.afterSend
|
||||||
import org.session.libsession.database.StorageProtocol
|
import org.session.libsession.database.StorageProtocol
|
||||||
@ -30,7 +33,6 @@ import org.session.libsession.utilities.Address.Companion.fromSerialized
|
|||||||
import org.session.libsession.utilities.ConfigFactoryProtocol
|
import org.session.libsession.utilities.ConfigFactoryProtocol
|
||||||
import org.session.libsession.utilities.ConfigUpdateNotification
|
import org.session.libsession.utilities.ConfigUpdateNotification
|
||||||
import org.session.libsession.utilities.GroupUtil
|
import org.session.libsession.utilities.GroupUtil
|
||||||
import org.session.libsession.utilities.SSKEnvironment
|
|
||||||
import org.session.libsession.utilities.SSKEnvironment.ProfileManagerProtocol.Companion.NAME_PADDED_LENGTH
|
import org.session.libsession.utilities.SSKEnvironment.ProfileManagerProtocol.Companion.NAME_PADDED_LENGTH
|
||||||
import org.session.libsession.utilities.TextSecurePreferences
|
import org.session.libsession.utilities.TextSecurePreferences
|
||||||
import org.session.libsession.utilities.recipients.Recipient
|
import org.session.libsession.utilities.recipients.Recipient
|
||||||
@ -45,7 +47,7 @@ import org.thoughtcrime.securesms.database.ThreadDatabase
|
|||||||
import org.thoughtcrime.securesms.dependencies.PollerFactory
|
import org.thoughtcrime.securesms.dependencies.PollerFactory
|
||||||
import org.thoughtcrime.securesms.groups.ClosedGroupManager
|
import org.thoughtcrime.securesms.groups.ClosedGroupManager
|
||||||
import org.thoughtcrime.securesms.groups.OpenGroupManager
|
import org.thoughtcrime.securesms.groups.OpenGroupManager
|
||||||
import org.thoughtcrime.securesms.util.asSequence
|
import org.thoughtcrime.securesms.sskenvironment.ProfileManager
|
||||||
import javax.inject.Inject
|
import javax.inject.Inject
|
||||||
|
|
||||||
private const val TAG = "ConfigToDatabaseSync"
|
private const val TAG = "ConfigToDatabaseSync"
|
||||||
@ -66,6 +68,8 @@ class ConfigToDatabaseSync @Inject constructor(
|
|||||||
private val mmsDatabase: MmsDatabase,
|
private val mmsDatabase: MmsDatabase,
|
||||||
private val pollerFactory: PollerFactory,
|
private val pollerFactory: PollerFactory,
|
||||||
private val clock: SnodeClock,
|
private val clock: SnodeClock,
|
||||||
|
private val profileManager: ProfileManager,
|
||||||
|
private val preferences: TextSecurePreferences,
|
||||||
) {
|
) {
|
||||||
private var job: Job? = null
|
private var job: Job? = null
|
||||||
|
|
||||||
@ -74,79 +78,98 @@ class ConfigToDatabaseSync @Inject constructor(
|
|||||||
|
|
||||||
@Suppress("OPT_IN_USAGE")
|
@Suppress("OPT_IN_USAGE")
|
||||||
job = GlobalScope.launch {
|
job = GlobalScope.launch {
|
||||||
val groupMutex = hashMapOf<AccountId, Mutex>()
|
supervisorScope {
|
||||||
val userMutex = Mutex()
|
val job1 = async {
|
||||||
|
configFactory.configUpdateNotifications
|
||||||
configFactory.configUpdateNotifications.collect { notification ->
|
.filterIsInstance<ConfigUpdateNotification.UserConfigsMerged>()
|
||||||
when (notification) {
|
.debounce(800L)
|
||||||
is ConfigUpdateNotification.UserConfigs -> {
|
.collect { config ->
|
||||||
launch {
|
try {
|
||||||
userMutex.withLock {
|
Log.i(TAG, "Start syncing user configs")
|
||||||
syncUserConfigs()
|
syncUserConfigs(config.timestamp)
|
||||||
|
Log.i(TAG, "Finished syncing user configs")
|
||||||
|
} catch (e: Exception) {
|
||||||
|
Log.e(TAG, "Error syncing user configs", e)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
is ConfigUpdateNotification.GroupConfigsUpdated -> {
|
val job2 = async {
|
||||||
val groupId = notification.groupId
|
configFactory.configUpdateNotifications
|
||||||
val mutex = groupMutex.getOrPut(groupId) { Mutex() }
|
.filterIsInstance<ConfigUpdateNotification.GroupConfigsUpdated>()
|
||||||
|
.collect {
|
||||||
launch {
|
syncGroupConfigs(it.groupId)
|
||||||
mutex.withLock {
|
|
||||||
syncGroupConfigs(groupId)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
is ConfigUpdateNotification.GroupConfigsDeleted -> {
|
job1.await()
|
||||||
groupMutex.remove(notification.groupId)
|
job2.await()
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun syncGroupConfigs(groupId: AccountId) {
|
private fun syncGroupConfigs(groupId: AccountId) {
|
||||||
configFactory.withGroupConfigs(groupId) {
|
val info = configFactory.withGroupConfigs(groupId) {
|
||||||
updateGroup(it.groupInfo)
|
UpdateGroupInfo(it.groupInfo)
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun syncUserConfigs() {
|
updateGroup(info)
|
||||||
val messageTimestamp = clock.currentTimeMills()
|
}
|
||||||
|
|
||||||
|
private fun syncUserConfigs(updateTimestamp: Long) {
|
||||||
|
lateinit var updateUserInfo: UpdateUserInfo
|
||||||
|
lateinit var updateUserGroupsInfo: UpdateUserGroupsInfo
|
||||||
|
lateinit var updateContacts: List<Contact>
|
||||||
|
lateinit var updateConvoVolatile: List<Conversation?>
|
||||||
|
|
||||||
configFactory.withUserConfigs { configs ->
|
configFactory.withUserConfigs { configs ->
|
||||||
updateUser(configs.userProfile, messageTimestamp)
|
updateUserInfo = UpdateUserInfo(configs.userProfile)
|
||||||
updateUserGroups(configs.userGroups, messageTimestamp)
|
updateUserGroupsInfo = UpdateUserGroupsInfo(configs.userGroups)
|
||||||
updateContacts(configs.contacts, messageTimestamp)
|
updateContacts = configs.contacts.all()
|
||||||
updateConvoVolatile(configs.convoInfoVolatile)
|
updateConvoVolatile = configs.convoInfoVolatile.all()
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun updateUser(userProfile: ReadableUserProfile, messageTimestamp: Long) {
|
updateUser(updateUserInfo, updateTimestamp)
|
||||||
|
updateContacts(updateContacts, updateTimestamp)
|
||||||
|
updateUserGroups(updateUserGroupsInfo, updateTimestamp)
|
||||||
|
updateConvoVolatile(updateConvoVolatile)
|
||||||
|
}
|
||||||
|
|
||||||
|
private data class UpdateUserInfo(
|
||||||
|
val name: String?,
|
||||||
|
val userPic: UserPic,
|
||||||
|
val ntsPriority: Long,
|
||||||
|
val ntsExpiry: ExpiryMode
|
||||||
|
) {
|
||||||
|
constructor(profile: ReadableUserProfile) : this(
|
||||||
|
name = profile.getName(),
|
||||||
|
userPic = profile.getPic(),
|
||||||
|
ntsPriority = profile.getNtsPriority(),
|
||||||
|
ntsExpiry = profile.getNtsExpiry()
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun updateUser(userProfile: UpdateUserInfo, messageTimestamp: Long) {
|
||||||
val userPublicKey = storage.getUserPublicKey() ?: return
|
val userPublicKey = storage.getUserPublicKey() ?: return
|
||||||
// would love to get rid of recipient and context from this
|
// would love to get rid of recipient and context from this
|
||||||
val recipient = Recipient.from(context, fromSerialized(userPublicKey), false)
|
val recipient = Recipient.from(context, fromSerialized(userPublicKey), false)
|
||||||
|
|
||||||
// Update profile name
|
// Update profile name
|
||||||
val name = userProfile.getName() ?: return
|
userProfile.name?.takeUnless { it.isEmpty() }?.truncate(NAME_PADDED_LENGTH)?.let {
|
||||||
val userPic = userProfile.getPic()
|
preferences.setProfileName(it)
|
||||||
val profileManager = SSKEnvironment.shared.profileManager
|
|
||||||
|
|
||||||
name.takeUnless { it.isEmpty() }?.truncate(NAME_PADDED_LENGTH)?.let {
|
|
||||||
TextSecurePreferences.setProfileName(context, it)
|
|
||||||
profileManager.setName(context, recipient, it)
|
profileManager.setName(context, recipient, it)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update profile picture
|
// Update profile picture
|
||||||
if (userPic == UserPic.DEFAULT) {
|
if (userProfile.userPic == UserPic.DEFAULT) {
|
||||||
storage.clearUserPic()
|
storage.clearUserPic(clearConfig = false)
|
||||||
} else if (userPic.key.isNotEmpty() && userPic.url.isNotEmpty()
|
} else if (userProfile.userPic.key.isNotEmpty() && userProfile.userPic.url.isNotEmpty()
|
||||||
&& TextSecurePreferences.getProfilePictureURL(context) != userPic.url
|
&& preferences.getProfilePictureURL() != userProfile.userPic.url
|
||||||
) {
|
) {
|
||||||
storage.setUserProfilePicture(userPic.url, userPic.key)
|
storage.setUserProfilePicture(userProfile.userPic.url, userProfile.userPic.key)
|
||||||
}
|
}
|
||||||
|
|
||||||
if (userProfile.getNtsPriority() == PRIORITY_HIDDEN) {
|
if (userProfile.ntsPriority == PRIORITY_HIDDEN) {
|
||||||
// delete nts thread if needed
|
// delete nts thread if needed
|
||||||
val ourThread = storage.getThreadId(recipient) ?: return
|
val ourThread = storage.getThreadId(recipient) ?: return
|
||||||
storage.deleteConversation(ourThread)
|
storage.deleteConversation(ourThread)
|
||||||
@ -157,53 +180,77 @@ class ConfigToDatabaseSync @Inject constructor(
|
|||||||
storage.setThreadDate(it, 0)
|
storage.setThreadDate(it, 0)
|
||||||
}
|
}
|
||||||
threadDatabase.setHasSent(ourThread, true)
|
threadDatabase.setHasSent(ourThread, true)
|
||||||
storage.setPinned(ourThread, userProfile.getNtsPriority() > 0)
|
storage.setPinned(ourThread, userProfile.ntsPriority > 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set or reset the shared library to use latest expiration config
|
// Set or reset the shared library to use latest expiration config
|
||||||
storage.getThreadId(recipient)?.let {
|
storage.getThreadId(recipient)?.let {
|
||||||
storage.setExpirationConfiguration(
|
storage.setExpirationConfiguration(
|
||||||
storage.getExpirationConfiguration(it)?.takeIf { it.updatedTimestampMs > messageTimestamp } ?:
|
storage.getExpirationConfiguration(it)?.takeIf { it.updatedTimestampMs > messageTimestamp } ?:
|
||||||
ExpirationConfiguration(it, userProfile.getNtsExpiry(), messageTimestamp)
|
ExpirationConfiguration(it, userProfile.ntsExpiry, messageTimestamp)
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun updateGroup(groupInfoConfig: ReadableGroupInfoConfig) {
|
private data class UpdateGroupInfo(
|
||||||
val threadId = storage.getThreadId(fromSerialized(groupInfoConfig.id().hexString)) ?: return
|
val id: AccountId,
|
||||||
|
val name: String?,
|
||||||
|
val deleteBefore: Long?,
|
||||||
|
val deleteAttachmentsBefore: Long?
|
||||||
|
) {
|
||||||
|
constructor(groupInfoConfig: ReadableGroupInfoConfig) : this(
|
||||||
|
id = groupInfoConfig.id(),
|
||||||
|
name = groupInfoConfig.getName(),
|
||||||
|
deleteBefore = groupInfoConfig.getDeleteBefore(),
|
||||||
|
deleteAttachmentsBefore = groupInfoConfig.getDeleteAttachmentsBefore()
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun updateGroup(groupInfoConfig: UpdateGroupInfo) {
|
||||||
|
val threadId = storage.getThreadId(fromSerialized(groupInfoConfig.id.hexString)) ?: return
|
||||||
val recipient = storage.getRecipientForThread(threadId) ?: return
|
val recipient = storage.getRecipientForThread(threadId) ?: return
|
||||||
recipientDatabase.setProfileName(recipient, groupInfoConfig.getName())
|
recipientDatabase.setProfileName(recipient, groupInfoConfig.name)
|
||||||
groupInfoConfig.getDeleteBefore()?.let { removeBefore ->
|
profileManager.setName(context, recipient, groupInfoConfig.name ?: "")
|
||||||
|
groupInfoConfig.deleteBefore?.let { removeBefore ->
|
||||||
storage.trimThreadBefore(threadId, removeBefore)
|
storage.trimThreadBefore(threadId, removeBefore)
|
||||||
}
|
}
|
||||||
groupInfoConfig.getDeleteAttachmentsBefore()?.let { removeAttachmentsBefore ->
|
groupInfoConfig.deleteAttachmentsBefore?.let { removeAttachmentsBefore ->
|
||||||
mmsDatabase.deleteMessagesInThreadBeforeDate(threadId, removeAttachmentsBefore, onlyMedia = true)
|
mmsDatabase.deleteMessagesInThreadBeforeDate(threadId, removeAttachmentsBefore, onlyMedia = true)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun updateContacts(contacts: ReadableContacts, messageTimestamp: Long) {
|
private fun updateContacts(contacts: List<Contact>, messageTimestamp: Long) {
|
||||||
val extracted = contacts.all().toList()
|
storage.addLibSessionContacts(contacts, messageTimestamp)
|
||||||
storage.addLibSessionContacts(extracted, messageTimestamp)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun updateUserGroups(userGroups: ReadableUserGroupsConfig, messageTimestamp: Long) {
|
private data class UpdateUserGroupsInfo(
|
||||||
|
val communityInfo: List<GroupInfo.CommunityGroupInfo>,
|
||||||
|
val legacyGroupInfo: List<GroupInfo.LegacyGroupInfo>,
|
||||||
|
val closedGroupInfo: List<GroupInfo.ClosedGroupInfo>
|
||||||
|
) {
|
||||||
|
constructor(userGroups: ReadableUserGroupsConfig) : this(
|
||||||
|
communityInfo = userGroups.allCommunityInfo(),
|
||||||
|
legacyGroupInfo = userGroups.allLegacyGroupInfo(),
|
||||||
|
closedGroupInfo = userGroups.allClosedGroupInfo()
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun updateUserGroups(userGroups: UpdateUserGroupsInfo, messageTimestamp: Long) {
|
||||||
val localUserPublicKey = storage.getUserPublicKey() ?: return Log.w(
|
val localUserPublicKey = storage.getUserPublicKey() ?: return Log.w(
|
||||||
"Loki",
|
TAG,
|
||||||
"No user public key when trying to update user groups from config"
|
"No user public key when trying to update user groups from config"
|
||||||
)
|
)
|
||||||
val communities = userGroups.allCommunityInfo()
|
|
||||||
val lgc = userGroups.allLegacyGroupInfo()
|
|
||||||
val allOpenGroups = storage.getAllOpenGroups()
|
val allOpenGroups = storage.getAllOpenGroups()
|
||||||
val toDeleteCommunities = allOpenGroups.filter {
|
val toDeleteCommunities = allOpenGroups.filter {
|
||||||
Conversation.Community(BaseCommunityInfo(it.value.server, it.value.room, it.value.publicKey), 0, false).baseCommunityInfo.fullUrl() !in communities.map { it.community.fullUrl() }
|
Conversation.Community(BaseCommunityInfo(it.value.server, it.value.room, it.value.publicKey), 0, false).baseCommunityInfo.fullUrl() !in userGroups.communityInfo.map { it.community.fullUrl() }
|
||||||
}
|
}
|
||||||
|
|
||||||
val existingCommunities: Map<Long, OpenGroup> = allOpenGroups.filterKeys { it !in toDeleteCommunities.keys }
|
val existingCommunities: Map<Long, OpenGroup> = allOpenGroups.filterKeys { it !in toDeleteCommunities.keys }
|
||||||
val toAddCommunities = communities.filter { it.community.fullUrl() !in existingCommunities.map { it.value.joinURL } }
|
val toAddCommunities = userGroups.communityInfo.filter { it.community.fullUrl() !in existingCommunities.map { it.value.joinURL } }
|
||||||
val existingJoinUrls = existingCommunities.values.map { it.joinURL }
|
val existingJoinUrls = existingCommunities.values.map { it.joinURL }
|
||||||
|
|
||||||
val existingLegacyClosedGroups = storage.getAllGroups(includeInactive = true).filter { it.isLegacyClosedGroup }
|
val existingLegacyClosedGroups = storage.getAllGroups(includeInactive = true).filter { it.isLegacyClosedGroup }
|
||||||
val lgcIds = lgc.map { it.accountId }
|
val lgcIds = userGroups.legacyGroupInfo.map { it.accountId }
|
||||||
val toDeleteLegacyClosedGroups = existingLegacyClosedGroups.filter { group ->
|
val toDeleteLegacyClosedGroups = existingLegacyClosedGroups.filter { group ->
|
||||||
GroupUtil.doubleDecodeGroupId(group.encodedId) !in lgcIds
|
GroupUtil.doubleDecodeGroupId(group.encodedId) !in lgcIds
|
||||||
}
|
}
|
||||||
@ -228,7 +275,7 @@ class ConfigToDatabaseSync @Inject constructor(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for (groupInfo in communities) {
|
for (groupInfo in userGroups.communityInfo) {
|
||||||
val groupBaseCommunity = groupInfo.community
|
val groupBaseCommunity = groupInfo.community
|
||||||
if (groupBaseCommunity.fullUrl() in existingJoinUrls) {
|
if (groupBaseCommunity.fullUrl() in existingJoinUrls) {
|
||||||
// add it
|
// add it
|
||||||
@ -237,7 +284,6 @@ class ConfigToDatabaseSync @Inject constructor(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
val newClosedGroups = userGroups.allClosedGroupInfo()
|
|
||||||
val existingClosedGroupThreads: Map<AccountId, Long> = threadDatabase.readerFor(threadDatabase.conversationList).use { reader ->
|
val existingClosedGroupThreads: Map<AccountId, Long> = threadDatabase.readerFor(threadDatabase.conversationList).use { reader ->
|
||||||
buildMap(reader.count) {
|
buildMap(reader.count) {
|
||||||
var current = reader.next
|
var current = reader.next
|
||||||
@ -253,7 +299,7 @@ class ConfigToDatabaseSync @Inject constructor(
|
|||||||
|
|
||||||
val groupThreadsToKeep = hashMapOf<AccountId, Long>()
|
val groupThreadsToKeep = hashMapOf<AccountId, Long>()
|
||||||
|
|
||||||
for (closedGroup in newClosedGroups) {
|
for (closedGroup in userGroups.closedGroupInfo) {
|
||||||
val recipient = Recipient.from(context, fromSerialized(closedGroup.groupAccountId.hexString), false)
|
val recipient = Recipient.from(context, fromSerialized(closedGroup.groupAccountId.hexString), false)
|
||||||
storage.setRecipientApprovedMe(recipient, true)
|
storage.setRecipientApprovedMe(recipient, true)
|
||||||
storage.setRecipientApproved(recipient, !closedGroup.invited)
|
storage.setRecipientApproved(recipient, !closedGroup.invited)
|
||||||
@ -273,7 +319,7 @@ class ConfigToDatabaseSync @Inject constructor(
|
|||||||
storage.removeClosedGroupThread(threadId)
|
storage.removeClosedGroupThread(threadId)
|
||||||
}
|
}
|
||||||
|
|
||||||
for (group in lgc) {
|
for (group in userGroups.legacyGroupInfo) {
|
||||||
val groupId = GroupUtil.doubleEncodeGroupID(group.accountId)
|
val groupId = GroupUtil.doubleEncodeGroupID(group.accountId)
|
||||||
val existingGroup = existingLegacyClosedGroups.firstOrNull { GroupUtil.doubleDecodeGroupId(it.encodedId) == group.accountId }
|
val existingGroup = existingLegacyClosedGroups.firstOrNull { GroupUtil.doubleDecodeGroupId(it.encodedId) == group.accountId }
|
||||||
val existingThread = existingGroup?.let { storage.getThreadId(existingGroup.encodedId) }
|
val existingThread = existingGroup?.let { storage.getThreadId(existingGroup.encodedId) }
|
||||||
@ -282,9 +328,9 @@ class ConfigToDatabaseSync @Inject constructor(
|
|||||||
ClosedGroupManager.silentlyRemoveGroup(context,existingThread,
|
ClosedGroupManager.silentlyRemoveGroup(context,existingThread,
|
||||||
GroupUtil.doubleDecodeGroupId(existingGroup.encodedId), existingGroup.encodedId, localUserPublicKey, delete = true)
|
GroupUtil.doubleDecodeGroupId(existingGroup.encodedId), existingGroup.encodedId, localUserPublicKey, delete = true)
|
||||||
} else if (existingThread == null) {
|
} else if (existingThread == null) {
|
||||||
Log.w("Loki-DBG", "Existing group had no thread to hide")
|
Log.w(TAG, "Existing group had no thread to hide")
|
||||||
} else {
|
} else {
|
||||||
Log.d("Loki-DBG", "Setting existing group pinned status to ${group.priority}")
|
Log.d(TAG, "Setting existing group pinned status to ${group.priority}")
|
||||||
threadDatabase.setPinned(existingThread, group.priority == PRIORITY_PINNED)
|
threadDatabase.setPinned(existingThread, group.priority == PRIORITY_PINNED)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@ -322,8 +368,8 @@ class ConfigToDatabaseSync @Inject constructor(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun updateConvoVolatile(convos: ReadableConversationVolatileConfig) {
|
private fun updateConvoVolatile(convos: List<Conversation?>) {
|
||||||
val extracted = convos.all().filterNotNull()
|
val extracted = convos.filterNotNull()
|
||||||
for (conversation in extracted) {
|
for (conversation in extracted) {
|
||||||
val threadId = when (conversation) {
|
val threadId = when (conversation) {
|
||||||
is Conversation.OneToOne -> storage.getThreadIdFor(conversation.accountId, null, null, createThread = false)
|
is Conversation.OneToOne -> storage.getThreadIdFor(conversation.accountId, null, null, createThread = false)
|
||||||
@ -334,11 +380,11 @@ class ConfigToDatabaseSync @Inject constructor(
|
|||||||
if (threadId != null) {
|
if (threadId != null) {
|
||||||
if (conversation.lastRead > storage.getLastSeen(threadId)) {
|
if (conversation.lastRead > storage.getLastSeen(threadId)) {
|
||||||
storage.markConversationAsRead(threadId, conversation.lastRead, force = true)
|
storage.markConversationAsRead(threadId, conversation.lastRead, force = true)
|
||||||
}
|
|
||||||
storage.updateThread(threadId, false)
|
storage.updateThread(threadId, false)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1,19 +1,22 @@
|
|||||||
package org.thoughtcrime.securesms.configs
|
package org.thoughtcrime.securesms.configs
|
||||||
|
|
||||||
import kotlinx.coroutines.DelicateCoroutinesApi
|
import kotlinx.coroutines.DelicateCoroutinesApi
|
||||||
|
import kotlinx.coroutines.FlowPreview
|
||||||
import kotlinx.coroutines.GlobalScope
|
import kotlinx.coroutines.GlobalScope
|
||||||
import kotlinx.coroutines.Job
|
import kotlinx.coroutines.Job
|
||||||
import kotlinx.coroutines.async
|
import kotlinx.coroutines.async
|
||||||
import kotlinx.coroutines.awaitAll
|
import kotlinx.coroutines.awaitAll
|
||||||
import kotlinx.coroutines.coroutineScope
|
import kotlinx.coroutines.coroutineScope
|
||||||
|
import kotlinx.coroutines.flow.debounce
|
||||||
|
import kotlinx.coroutines.flow.filterIsInstance
|
||||||
import kotlinx.coroutines.launch
|
import kotlinx.coroutines.launch
|
||||||
import kotlinx.coroutines.sync.Mutex
|
import kotlinx.coroutines.supervisorScope
|
||||||
import kotlinx.coroutines.sync.withLock
|
|
||||||
import network.loki.messenger.libsession_util.util.ConfigPush
|
import network.loki.messenger.libsession_util.util.ConfigPush
|
||||||
import org.session.libsession.database.StorageProtocol
|
import org.session.libsession.database.StorageProtocol
|
||||||
import org.session.libsession.database.userAuth
|
import org.session.libsession.database.userAuth
|
||||||
import org.session.libsession.snode.OwnedSwarmAuth
|
import org.session.libsession.snode.OwnedSwarmAuth
|
||||||
import org.session.libsession.snode.SnodeAPI
|
import org.session.libsession.snode.SnodeAPI
|
||||||
|
import org.session.libsession.snode.SnodeClock
|
||||||
import org.session.libsession.snode.SnodeMessage
|
import org.session.libsession.snode.SnodeMessage
|
||||||
import org.session.libsession.snode.SwarmAuth
|
import org.session.libsession.snode.SwarmAuth
|
||||||
import org.session.libsession.snode.model.StoreMessageResponse
|
import org.session.libsession.snode.model.StoreMessageResponse
|
||||||
@ -47,32 +50,39 @@ private const val TAG = "ConfigUploader"
|
|||||||
class ConfigUploader @Inject constructor(
|
class ConfigUploader @Inject constructor(
|
||||||
private val configFactory: ConfigFactoryProtocol,
|
private val configFactory: ConfigFactoryProtocol,
|
||||||
private val storageProtocol: StorageProtocol,
|
private val storageProtocol: StorageProtocol,
|
||||||
|
private val clock: SnodeClock,
|
||||||
) {
|
) {
|
||||||
private var job: Job? = null
|
private var job: Job? = null
|
||||||
|
|
||||||
@OptIn(DelicateCoroutinesApi::class)
|
@OptIn(DelicateCoroutinesApi::class, FlowPreview::class)
|
||||||
fun start() {
|
fun start() {
|
||||||
require(job == null) { "Already started" }
|
require(job == null) { "Already started" }
|
||||||
|
|
||||||
job = GlobalScope.launch {
|
job = GlobalScope.launch {
|
||||||
val groupMutex = hashMapOf<AccountId, Mutex>()
|
supervisorScope {
|
||||||
val userMutex = Mutex()
|
val job1 = launch {
|
||||||
|
|
||||||
configFactory.configUpdateNotifications
|
configFactory.configUpdateNotifications
|
||||||
.collect { changes ->
|
.filterIsInstance<ConfigUpdateNotification.UserConfigsModified>()
|
||||||
when (changes) {
|
.debounce(1000L)
|
||||||
is ConfigUpdateNotification.GroupConfigsDeleted -> {
|
.collect {
|
||||||
groupMutex.remove(changes.groupId)
|
|
||||||
}
|
|
||||||
|
|
||||||
is ConfigUpdateNotification.GroupConfigsUpdated -> {
|
|
||||||
// Group config pushing is limited to its own dispatcher
|
|
||||||
launch {
|
|
||||||
try {
|
try {
|
||||||
retryWithUniformInterval {
|
retryWithUniformInterval {
|
||||||
groupMutex.getOrPut(changes.groupId) { Mutex() }.withLock {
|
pushUserConfigChangesIfNeeded()
|
||||||
pushGroupConfigsChangesIfNeeded(changes.groupId)
|
|
||||||
}
|
}
|
||||||
|
} catch (e: Exception) {
|
||||||
|
Log.e(TAG, "Failed to push user configs", e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
val job2 = launch {
|
||||||
|
configFactory.configUpdateNotifications
|
||||||
|
.filterIsInstance<ConfigUpdateNotification.GroupConfigsUpdated>()
|
||||||
|
.debounce(1000L)
|
||||||
|
.collect { changes ->
|
||||||
|
try {
|
||||||
|
retryWithUniformInterval {
|
||||||
|
pushGroupConfigsChangesIfNeeded(changes.groupId)
|
||||||
}
|
}
|
||||||
} catch (e: Exception) {
|
} catch (e: Exception) {
|
||||||
Log.e(TAG, "Failed to push group configs", e)
|
Log.e(TAG, "Failed to push group configs", e)
|
||||||
@ -80,18 +90,8 @@ class ConfigUploader @Inject constructor(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ConfigUpdateNotification.UserConfigs -> launch {
|
job1.join()
|
||||||
try {
|
job2.join()
|
||||||
retryWithUniformInterval {
|
|
||||||
userMutex.withLock {
|
|
||||||
pushUserConfigChangesIfNeeded()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (e: Exception) {
|
|
||||||
Log.e(TAG, "Failed to push user configs", e)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -160,7 +160,7 @@ class ConfigUploader @Inject constructor(
|
|||||||
auth.accountId.hexString,
|
auth.accountId.hexString,
|
||||||
Base64.encodeBytes(keysPush),
|
Base64.encodeBytes(keysPush),
|
||||||
SnodeMessage.CONFIG_TTL,
|
SnodeMessage.CONFIG_TTL,
|
||||||
SnodeAPI.nowWithOffset,
|
clock.currentTimeMills(),
|
||||||
),
|
),
|
||||||
auth
|
auth
|
||||||
),
|
),
|
||||||
@ -203,7 +203,7 @@ class ConfigUploader @Inject constructor(
|
|||||||
auth.accountId.hexString,
|
auth.accountId.hexString,
|
||||||
Base64.encodeBytes(push.config),
|
Base64.encodeBytes(push.config),
|
||||||
SnodeMessage.CONFIG_TTL,
|
SnodeMessage.CONFIG_TTL,
|
||||||
SnodeAPI.nowWithOffset,
|
clock.currentTimeMills(),
|
||||||
),
|
),
|
||||||
auth,
|
auth,
|
||||||
),
|
),
|
||||||
|
@ -257,8 +257,8 @@ open class Storage @Inject constructor(
|
|||||||
Recipient.from(context, it, false)
|
Recipient.from(context, it, false)
|
||||||
}
|
}
|
||||||
ourRecipient.resolve().profileKey = newProfileKey
|
ourRecipient.resolve().profileKey = newProfileKey
|
||||||
TextSecurePreferences.setProfileKey(context, newProfileKey?.let { Base64.encodeBytes(it) })
|
preferences.setProfileKey(newProfileKey?.let { Base64.encodeBytes(it) })
|
||||||
TextSecurePreferences.setProfilePictureURL(context, newProfilePicture)
|
preferences.setProfilePictureURL(newProfilePicture)
|
||||||
|
|
||||||
if (newProfileKey != null) {
|
if (newProfileKey != null) {
|
||||||
JobQueue.shared.add(RetrieveProfileAvatarJob(newProfilePicture, ourRecipient.address))
|
JobQueue.shared.add(RetrieveProfileAvatarJob(newProfilePicture, ourRecipient.address))
|
||||||
@ -533,22 +533,24 @@ open class Storage @Inject constructor(
|
|||||||
return configFactory.withUserConfigs { it.userProfile.getCommunityMessageRequests() }
|
return configFactory.withUserConfigs { it.userProfile.getCommunityMessageRequests() }
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun clearUserPic() {
|
override fun clearUserPic(clearConfig: Boolean) {
|
||||||
val userPublicKey = getUserPublicKey() ?: return Log.w(TAG, "No user public key when trying to clear user pic")
|
val userPublicKey = getUserPublicKey() ?: return Log.w(TAG, "No user public key when trying to clear user pic")
|
||||||
val recipient = Recipient.from(context, fromSerialized(userPublicKey), false)
|
val recipient = Recipient.from(context, fromSerialized(userPublicKey), false)
|
||||||
|
|
||||||
// Clear details related to the user's profile picture
|
// Clear details related to the user's profile picture
|
||||||
TextSecurePreferences.setProfileKey(context, null)
|
preferences.setProfileKey(null)
|
||||||
ProfileKeyUtil.setEncodedProfileKey(context, null)
|
ProfileKeyUtil.setEncodedProfileKey(context, null)
|
||||||
recipientDatabase.setProfileAvatar(recipient, null)
|
recipientDatabase.setProfileAvatar(recipient, null)
|
||||||
TextSecurePreferences.setProfileAvatarId(context, 0)
|
preferences.setProfileAvatarId(0)
|
||||||
TextSecurePreferences.setProfilePictureURL(context, null)
|
preferences.setProfilePictureURL(null)
|
||||||
|
|
||||||
Recipient.removeCached(fromSerialized(userPublicKey))
|
Recipient.removeCached(fromSerialized(userPublicKey))
|
||||||
|
if (clearConfig) {
|
||||||
configFactory.withMutableUserConfigs {
|
configFactory.withMutableUserConfigs {
|
||||||
it.userProfile.setPic(UserPic.DEFAULT)
|
it.userProfile.setPic(UserPic.DEFAULT)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
override fun setAuthToken(room: String, server: String, newValue: String) {
|
override fun setAuthToken(room: String, server: String, newValue: String) {
|
||||||
val id = "$server.$room"
|
val id = "$server.$room"
|
||||||
|
@ -142,14 +142,14 @@ class ConfigFactory @Inject constructor(
|
|||||||
*
|
*
|
||||||
* @param cb A function that takes a [UserConfigsImpl] and returns a pair of the result of the operation and a boolean indicating if the configs were changed.
|
* @param cb A function that takes a [UserConfigsImpl] and returns a pair of the result of the operation and a boolean indicating if the configs were changed.
|
||||||
*/
|
*/
|
||||||
private fun <T> doWithMutableUserConfigs(cb: (UserConfigsImpl) -> Pair<T, Boolean>): T {
|
private fun <T> doWithMutableUserConfigs(cb: (UserConfigsImpl) -> Pair<T, ConfigUpdateNotification?>): T {
|
||||||
val (lock, configs) = ensureUserConfigsInitialized()
|
val (lock, configs) = ensureUserConfigsInitialized()
|
||||||
val (result, changed) = lock.write {
|
val (result, changed) = lock.write {
|
||||||
cb(configs)
|
cb(configs)
|
||||||
}
|
}
|
||||||
|
|
||||||
if (changed) {
|
if (changed != null) {
|
||||||
_configUpdateNotifications.tryEmit(ConfigUpdateNotification.UserConfigs)
|
_configUpdateNotifications.tryEmit(changed)
|
||||||
}
|
}
|
||||||
|
|
||||||
return result
|
return result
|
||||||
@ -171,13 +171,25 @@ class ConfigFactory @Inject constructor(
|
|||||||
UserConfigType.USER_GROUPS -> configs.userGroups
|
UserConfigType.USER_GROUPS -> configs.userGroups
|
||||||
}
|
}
|
||||||
|
|
||||||
Unit to config.merge(messages.map { it.hash to it.data }.toTypedArray()).isNotEmpty()
|
val maxTimestamp = config.merge(messages.map { it.hash to it.data }.toTypedArray())
|
||||||
|
.asSequence()
|
||||||
|
.mapNotNull { hash -> messages.firstOrNull { it.hash == hash } }
|
||||||
|
.maxOfOrNull { it.timestamp }
|
||||||
|
|
||||||
|
Unit to maxTimestamp?.let(ConfigUpdateNotification::UserConfigsMerged)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun <T> withMutableUserConfigs(cb: (MutableUserConfigs) -> T): T {
|
override fun <T> withMutableUserConfigs(cb: (MutableUserConfigs) -> T): T {
|
||||||
return doWithMutableUserConfigs {
|
return doWithMutableUserConfigs {
|
||||||
cb(it) to it.persistIfDirty(clock)
|
val result = cb(it)
|
||||||
|
val changed = if (it.persistIfDirty(clock)) {
|
||||||
|
ConfigUpdateNotification.UserConfigsModified
|
||||||
|
} else {
|
||||||
|
null
|
||||||
|
}
|
||||||
|
|
||||||
|
result to changed
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -229,10 +241,6 @@ class ConfigFactory @Inject constructor(
|
|||||||
it.convoInfoVolatile.eraseClosedGroup(groupId.hexString)
|
it.convoInfoVolatile.eraseClosedGroup(groupId.hexString)
|
||||||
}
|
}
|
||||||
|
|
||||||
if (groupConfigs.remove(groupId) != null) {
|
|
||||||
_configUpdateNotifications.tryEmit(ConfigUpdateNotification.GroupConfigsDeleted(groupId))
|
|
||||||
}
|
|
||||||
|
|
||||||
configDatabase.deleteGroupConfigs(groupId)
|
configDatabase.deleteGroupConfigs(groupId)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -291,7 +299,9 @@ class ConfigFactory @Inject constructor(
|
|||||||
convoInfoVolatile?.let { (push, result) -> configs.convoInfoVolatile.confirmPushed(push.seqNo, result.hash) }
|
convoInfoVolatile?.let { (push, result) -> configs.convoInfoVolatile.confirmPushed(push.seqNo, result.hash) }
|
||||||
userGroups?.let { (push, result) -> configs.userGroups.confirmPushed(push.seqNo, result.hash) }
|
userGroups?.let { (push, result) -> configs.userGroups.confirmPushed(push.seqNo, result.hash) }
|
||||||
|
|
||||||
Unit to configs.persistIfDirty(clock)
|
configs.persistIfDirty(clock)
|
||||||
|
|
||||||
|
Unit to null
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -385,7 +395,7 @@ class ConfigFactory @Inject constructor(
|
|||||||
}
|
}
|
||||||
|
|
||||||
fun clearAll() {
|
fun clearAll() {
|
||||||
//TODO: clear all configsr
|
//TODO: clear all configs
|
||||||
}
|
}
|
||||||
|
|
||||||
private class GroupSubAccountSwarmAuth(
|
private class GroupSubAccountSwarmAuth(
|
||||||
|
@ -16,6 +16,7 @@ import kotlinx.coroutines.flow.asStateFlow
|
|||||||
import kotlinx.coroutines.flow.buffer
|
import kotlinx.coroutines.flow.buffer
|
||||||
import kotlinx.coroutines.flow.collectLatest
|
import kotlinx.coroutines.flow.collectLatest
|
||||||
import kotlinx.coroutines.flow.filter
|
import kotlinx.coroutines.flow.filter
|
||||||
|
import kotlinx.coroutines.flow.filterIsInstance
|
||||||
import kotlinx.coroutines.flow.first
|
import kotlinx.coroutines.flow.first
|
||||||
import kotlinx.coroutines.flow.flatMapLatest
|
import kotlinx.coroutines.flow.flatMapLatest
|
||||||
import kotlinx.coroutines.flow.flow
|
import kotlinx.coroutines.flow.flow
|
||||||
@ -73,8 +74,8 @@ internal class LoadingViewModel @Inject constructor(
|
|||||||
viewModelScope.launch {
|
viewModelScope.launch {
|
||||||
try {
|
try {
|
||||||
configFactory.configUpdateNotifications
|
configFactory.configUpdateNotifications
|
||||||
.filter { it == ConfigUpdateNotification.UserConfigs }
|
.filterIsInstance<ConfigUpdateNotification.UserConfigsModified>()
|
||||||
.onStart { emit(ConfigUpdateNotification.UserConfigs) }
|
.onStart { emit(ConfigUpdateNotification.UserConfigsModified) }
|
||||||
.filter {
|
.filter {
|
||||||
configFactory.withUserConfigs { configs ->
|
configFactory.withUserConfigs { configs ->
|
||||||
!configs.userProfile.getName().isNullOrEmpty()
|
!configs.userProfile.getName().isNullOrEmpty()
|
||||||
|
@ -55,7 +55,7 @@ interface StorageProtocol {
|
|||||||
fun setProfilePicture(recipient: Recipient, newProfilePicture: String?, newProfileKey: ByteArray?)
|
fun setProfilePicture(recipient: Recipient, newProfilePicture: String?, newProfileKey: ByteArray?)
|
||||||
fun setBlocksCommunityMessageRequests(recipient: Recipient, blocksMessageRequests: Boolean)
|
fun setBlocksCommunityMessageRequests(recipient: Recipient, blocksMessageRequests: Boolean)
|
||||||
fun setUserProfilePicture(newProfilePicture: String?, newProfileKey: ByteArray?)
|
fun setUserProfilePicture(newProfilePicture: String?, newProfileKey: ByteArray?)
|
||||||
fun clearUserPic()
|
fun clearUserPic(clearConfig: Boolean = true)
|
||||||
// Signal
|
// Signal
|
||||||
fun getOrGenerateRegistrationID(): Int
|
fun getOrGenerateRegistrationID(): Int
|
||||||
|
|
||||||
|
@ -115,8 +115,8 @@ suspend fun ConfigFactoryProtocol.waitUntilUserConfigsPushed(timeoutMills: Long
|
|||||||
|
|
||||||
return withTimeoutOrNull(timeoutMills){
|
return withTimeoutOrNull(timeoutMills){
|
||||||
configUpdateNotifications
|
configUpdateNotifications
|
||||||
.onStart { emit(ConfigUpdateNotification.UserConfigs) } // Trigger the filtering immediately
|
.onStart { emit(ConfigUpdateNotification.UserConfigsModified) } // Trigger the filtering immediately
|
||||||
.filter { it == ConfigUpdateNotification.UserConfigs && !needsPush() }
|
.filter { it == ConfigUpdateNotification.UserConfigsModified && !needsPush() }
|
||||||
.first()
|
.first()
|
||||||
} != null
|
} != null
|
||||||
}
|
}
|
||||||
@ -190,8 +190,10 @@ interface MutableGroupConfigs : GroupConfigs {
|
|||||||
fun rekey()
|
fun rekey()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
sealed interface ConfigUpdateNotification {
|
sealed interface ConfigUpdateNotification {
|
||||||
data object UserConfigs : ConfigUpdateNotification
|
data object UserConfigsModified : ConfigUpdateNotification
|
||||||
|
data class UserConfigsMerged(val timestamp: Long) : ConfigUpdateNotification
|
||||||
|
|
||||||
data class GroupConfigsUpdated(val groupId: AccountId) : ConfigUpdateNotification
|
data class GroupConfigsUpdated(val groupId: AccountId) : ConfigUpdateNotification
|
||||||
data class GroupConfigsDeleted(val groupId: AccountId) : ConfigUpdateNotification
|
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user