From 11c1fd382d3ca48564198dfb17388d9d4aee2f00 Mon Sep 17 00:00:00 2001 From: Morgan Pretty Date: Wed, 7 Jun 2023 13:31:22 +1000 Subject: [PATCH 1/2] Fixed a few issues with the OpenGroupPoller Fixed an issue where the admin/moderator status wasn't getting stored if set before joining a community Fixed an issue where multiple pollers for the same server could run at the same time when joining multiple rooms within the same app run (very noticeable when restoring/linking) --- .../securesms/groups/OpenGroupManager.kt | 25 ++- .../messaging/jobs/BackgroundGroupAddJob.kt | 8 +- .../messaging/open_groups/OpenGroupApi.kt | 21 +- .../pollers/OpenGroupPoller.kt | 180 ++++++++++-------- 4 files changed, 134 insertions(+), 100 deletions(-) diff --git a/app/src/main/java/org/thoughtcrime/securesms/groups/OpenGroupManager.kt b/app/src/main/java/org/thoughtcrime/securesms/groups/OpenGroupManager.kt index ef47269107..6a342eacda 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/groups/OpenGroupManager.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/groups/OpenGroupManager.kt @@ -41,11 +41,13 @@ object OpenGroupManager { isPolling = true val storage = MessagingModuleConfiguration.shared.storage val servers = storage.getAllOpenGroups().values.map { it.server }.toSet() - servers.forEach { server -> - pollers[server]?.stop() // Shouldn't be necessary - val poller = OpenGroupPoller(server, executorService) - poller.startIfNeeded() - pollers[server] = poller + synchronized(pollUpdaterLock) { + servers.forEach { server -> + pollers[server]?.stop() // Shouldn't be necessary + val poller = OpenGroupPoller(server, executorService) + poller.startIfNeeded() + pollers[server] = poller + } } } @@ -60,7 +62,7 @@ object OpenGroupManager { @WorkerThread fun add(server: String, room: String, publicKey: String, context: Context): OpenGroupApi.RoomInfo? { val openGroupID = "$server.$room" - var threadID = GroupManager.getOpenGroupThreadID(openGroupID, context) + val threadID = GroupManager.getOpenGroupThreadID(openGroupID, context) val storage = MessagingModuleConfiguration.shared.storage val threadDB = DatabaseComponent.get(context).lokiThreadDatabase() // Check it it's added already @@ -76,13 +78,16 @@ object OpenGroupManager { // Get capabilities & room info val (capabilities, info) = OpenGroupApi.getCapabilitiesAndRoomInfo(room, server).get() storage.setServerCapabilities(server, capabilities.capabilities) - storage.setUserCount(room, server, info.activeUsers) // Create the group locally if not available already if (threadID < 0) { - threadID = GroupManager.createOpenGroup(openGroupID, context, null, info.name).threadId + GroupManager.createOpenGroup(openGroupID, context, null, info.name) } - val openGroup = OpenGroup(server = server, room = room, publicKey = publicKey, name = info.name, imageId = info.imageId, canWrite = info.write, infoUpdates = info.infoUpdates) - threadDB.setOpenGroupChat(openGroup, threadID) + OpenGroupPoller.handleRoomPollInfo( + server = server, + roomToken = room, + pollInfo = info.toPollInfo(), + createGroupIfMissingWithPublicKey = publicKey + ) return info } diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/BackgroundGroupAddJob.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/BackgroundGroupAddJob.kt index 5154101328..c5ec1bc74e 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/BackgroundGroupAddJob.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/BackgroundGroupAddJob.kt @@ -39,13 +39,7 @@ class BackgroundGroupAddJob(val joinUrl: String): Job { delegate?.handleJobFailed(this, dispatcherName, DuplicateGroupException()) return } - // get image - storage.setOpenGroupPublicKey(openGroup.server, openGroup.serverPublicKey) - val info = storage.addOpenGroup(openGroup.joinUrl()) - val imageId = info?.imageId - if (imageId != null && storage.getGroupAvatarDownloadJob(openGroup.server, openGroup.room, imageId) == null) { - JobQueue.shared.add(GroupAvatarDownloadJob(openGroup.server, openGroup.room, imageId)) - } + storage.addOpenGroup(openGroup.joinUrl()) Log.d(KEY, "onOpenGroupAdded(${openGroup.server})") storage.onOpenGroupAdded(openGroup.server) } catch (e: Exception) { diff --git a/libsession/src/main/java/org/session/libsession/messaging/open_groups/OpenGroupApi.kt b/libsession/src/main/java/org/session/libsession/messaging/open_groups/OpenGroupApi.kt index ca60fd3cbf..a05addcee2 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/open_groups/OpenGroupApi.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/open_groups/OpenGroupApi.kt @@ -109,7 +109,26 @@ object OpenGroupApi { val defaultWrite: Boolean = false, val upload: Boolean = false, val defaultUpload: Boolean = false, - ) + ) { + fun toPollInfo(): RoomPollInfo { + return RoomPollInfo( + token = token, + activeUsers = activeUsers, + admin = admin, + globalAdmin = globalAdmin, + moderator = moderator, + globalModerator = globalModerator, + read = read, + defaultRead = defaultRead, + defaultAccessible = defaultAccessible, + write = write, + defaultWrite = defaultWrite, + upload = upload, + defaultUpload = defaultUpload, + details = this + ) + } + } @JsonNaming(PropertyNamingStrategy.SnakeCaseStrategy::class) data class PinnedMessage( diff --git a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPoller.kt b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPoller.kt index 562ddda699..fb7b3b6c8a 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPoller.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPoller.kt @@ -30,6 +30,7 @@ import org.session.libsignal.protos.SignalServiceProtos import org.session.libsignal.utilities.Base64 import org.session.libsignal.utilities.Log import org.session.libsignal.utilities.successBackground +import java.util.UUID import java.util.concurrent.ScheduledExecutorService import java.util.concurrent.ScheduledFuture import java.util.concurrent.TimeUnit @@ -39,15 +40,97 @@ class OpenGroupPoller(private val server: String, private val executorService: S var isCaughtUp = false var secondToLastJob: MessageReceiveJob? = null private var future: ScheduledFuture<*>? = null + private var runId: UUID = UUID.randomUUID() companion object { private const val pollInterval: Long = 4000L const val maxInactivityPeriod = 14 * 24 * 60 * 60 * 1000 + + public fun handleRoomPollInfo( + server: String, + roomToken: String, + pollInfo: OpenGroupApi.RoomPollInfo, + createGroupIfMissingWithPublicKey: String? = null + ) { + val storage = MessagingModuleConfiguration.shared.storage + val groupId = "$server.$roomToken" + val dbGroupId = GroupUtil.getEncodedOpenGroupID(groupId.toByteArray()) + val existingOpenGroup = storage.getOpenGroup(roomToken, server) + + // If we don't have an existing group and don't have a 'createGroupIfMissingWithPublicKey' + // value then don't process the poll info + val publicKey = ((existingOpenGroup?.publicKey ?: createGroupIfMissingWithPublicKey) ?: return) + + val openGroup = OpenGroup( + server = server, + room = pollInfo.token, + name = ((pollInfo.details?.name ?: existingOpenGroup?.name) ?: ""), + publicKey = publicKey, + imageId = (pollInfo.details?.imageId ?: existingOpenGroup?.imageId), + canWrite = pollInfo.write, + infoUpdates = ((pollInfo.details?.infoUpdates ?: existingOpenGroup?.infoUpdates) ?: 0) + ) + // - Open Group changes + storage.updateOpenGroup(openGroup) + + // - User Count + storage.setUserCount(roomToken, server, pollInfo.activeUsers) + + // - Moderators + pollInfo.details?.moderators?.let { moderatorList -> + storage.setGroupMemberRoles(moderatorList.map { + GroupMember(groupId, it, GroupMemberRole.MODERATOR) + }) + } + pollInfo.details?.hiddenModerators?.let { moderatorList -> + storage.setGroupMemberRoles(moderatorList.map { + GroupMember(groupId, it, GroupMemberRole.HIDDEN_MODERATOR) + }) + } + // - Admins + pollInfo.details?.admins?.let { moderatorList -> + storage.setGroupMemberRoles(moderatorList.map { + GroupMember(groupId, it, GroupMemberRole.ADMIN) + }) + } + pollInfo.details?.hiddenAdmins?.let { moderatorList -> + storage.setGroupMemberRoles(moderatorList.map { + GroupMember(groupId, it, GroupMemberRole.HIDDEN_ADMIN) + }) + } + + // Update the group avatar + if ( + ( + pollInfo.details != null && + pollInfo.details.imageId != null && ( + pollInfo.details.imageId != existingOpenGroup?.imageId || + !storage.hasDownloadedProfilePicture(dbGroupId) + ) && + storage.getGroupAvatarDownloadJob(openGroup.server, openGroup.room, pollInfo.details.imageId) == null + ) || ( + pollInfo.details == null && + existingOpenGroup?.imageId != null && + !storage.hasDownloadedProfilePicture(dbGroupId) && + storage.getGroupAvatarDownloadJob(openGroup.server, openGroup.room, existingOpenGroup.imageId) == null + ) + ) { + JobQueue.shared.add(GroupAvatarDownloadJob(server, roomToken, openGroup.imageId)) + } + else if ( + pollInfo.details != null && + pollInfo.details.imageId == null && + existingOpenGroup?.imageId != null + ) { + storage.removeProfilePicture(dbGroupId) + } + } } fun startIfNeeded() { if (hasStarted) { return } hasStarted = true + runId = UUID.randomUUID() future = executorService?.schedule(::poll, 0, TimeUnit.MILLISECONDS) } @@ -57,6 +140,7 @@ class OpenGroupPoller(private val server: String, private val executorService: S } fun poll(isPostCapabilitiesRetry: Boolean = false): Promise { + val currentRunId = runId val storage = MessagingModuleConfiguration.shared.storage val rooms = storage.getAllOpenGroups().values.filter { it.server == server }.map { it.room } @@ -86,22 +170,30 @@ class OpenGroupPoller(private val server: String, private val executorService: S isCaughtUp = true } } - executorService?.schedule(this@OpenGroupPoller::poll, pollInterval, TimeUnit.MILLISECONDS) + + // Only poll again if it's the same poller run + if (currentRunId == runId) { + future = executorService?.schedule(this@OpenGroupPoller::poll, pollInterval, TimeUnit.MILLISECONDS) + } }.fail { - updateCapabilitiesIfNeeded(isPostCapabilitiesRetry, it) + updateCapabilitiesIfNeeded(isPostCapabilitiesRetry, currentRunId, it) }.map { } } - private fun updateCapabilitiesIfNeeded(isPostCapabilitiesRetry: Boolean, exception: Exception) { + private fun updateCapabilitiesIfNeeded(isPostCapabilitiesRetry: Boolean, currentRunId: UUID, exception: Exception) { if (exception is OnionRequestAPI.HTTPRequestFailedBlindingRequiredException) { if (!isPostCapabilitiesRetry) { OpenGroupApi.getCapabilities(server).map { handleCapabilities(server, it) } - executorService?.schedule({ poll(isPostCapabilitiesRetry = true) }, pollInterval, TimeUnit.MILLISECONDS) + + // Only poll again if it's the same poller run + if (currentRunId == runId) { + future = executorService?.schedule({ poll(isPostCapabilitiesRetry = true) }, pollInterval, TimeUnit.MILLISECONDS) + } } - } else { - executorService?.schedule(this@OpenGroupPoller::poll, pollInterval, TimeUnit.MILLISECONDS) + } else if (currentRunId == runId) { + future = executorService?.schedule(this@OpenGroupPoller::poll, pollInterval, TimeUnit.MILLISECONDS) } } @@ -110,82 +202,6 @@ class OpenGroupPoller(private val server: String, private val executorService: S storage.setServerCapabilities(server, capabilities.capabilities) } - private fun handleRoomPollInfo( - server: String, - roomToken: String, - pollInfo: OpenGroupApi.RoomPollInfo - ) { - val storage = MessagingModuleConfiguration.shared.storage - val groupId = "$server.$roomToken" - val dbGroupId = GroupUtil.getEncodedOpenGroupID(groupId.toByteArray()) - - val existingOpenGroup = storage.getOpenGroup(roomToken, server) - val publicKey = existingOpenGroup?.publicKey ?: return - val openGroup = OpenGroup( - server = server, - room = pollInfo.token, - name = if (pollInfo.details != null) { pollInfo.details.name } else { existingOpenGroup.name }, - publicKey = publicKey, - imageId = if (pollInfo.details != null) { pollInfo.details.imageId } else { existingOpenGroup.imageId }, - canWrite = pollInfo.write, - infoUpdates = if (pollInfo.details != null) { pollInfo.details.infoUpdates } else { existingOpenGroup.infoUpdates } - ) - // - Open Group changes - storage.updateOpenGroup(openGroup) - - // - User Count - storage.setUserCount(roomToken, server, pollInfo.activeUsers) - - // - Moderators - pollInfo.details?.moderators?.let { moderatorList -> - storage.setGroupMemberRoles(moderatorList.map { - GroupMember(groupId, it, GroupMemberRole.MODERATOR) - }) - } - pollInfo.details?.hiddenModerators?.let { moderatorList -> - storage.setGroupMemberRoles(moderatorList.map { - GroupMember(groupId, it, GroupMemberRole.HIDDEN_MODERATOR) - }) - } - // - Admins - pollInfo.details?.admins?.let { moderatorList -> - storage.setGroupMemberRoles(moderatorList.map { - GroupMember(groupId, it, GroupMemberRole.ADMIN) - }) - } - pollInfo.details?.hiddenAdmins?.let { moderatorList -> - storage.setGroupMemberRoles(moderatorList.map { - GroupMember(groupId, it, GroupMemberRole.HIDDEN_ADMIN) - }) - } - - // Update the group avatar - if ( - ( - pollInfo.details != null && - pollInfo.details.imageId != null && ( - pollInfo.details.imageId != existingOpenGroup.imageId || - !storage.hasDownloadedProfilePicture(dbGroupId) - ) && - storage.getGroupAvatarDownloadJob(openGroup.server, openGroup.room, pollInfo.details.imageId) == null - ) || ( - pollInfo.details == null && - existingOpenGroup.imageId != null && - !storage.hasDownloadedProfilePicture(dbGroupId) && - storage.getGroupAvatarDownloadJob(openGroup.server, openGroup.room, existingOpenGroup.imageId) == null - ) - ) { - JobQueue.shared.add(GroupAvatarDownloadJob(server, roomToken, existingOpenGroup.imageId)) - } - else if ( - pollInfo.details != null && - pollInfo.details.imageId == null && - existingOpenGroup.imageId != null - ) { - storage.removeProfilePicture(dbGroupId) - } - } - private fun handleMessages( server: String, roomToken: String, From 7699e47f7be4f715a3b0a72cb7dcd257c9864018 Mon Sep 17 00:00:00 2001 From: Morgan Pretty Date: Wed, 7 Jun 2023 15:02:32 +1000 Subject: [PATCH 2/2] Responded to PR comments --- .../securesms/groups/OpenGroupManager.kt | 6 ++-- .../messaging/open_groups/OpenGroupApi.kt | 34 +++++++++---------- .../pollers/OpenGroupPoller.kt | 12 ++++--- 3 files changed, 26 insertions(+), 26 deletions(-) diff --git a/app/src/main/java/org/thoughtcrime/securesms/groups/OpenGroupManager.kt b/app/src/main/java/org/thoughtcrime/securesms/groups/OpenGroupManager.kt index 6a342eacda..dbdf2615ae 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/groups/OpenGroupManager.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/groups/OpenGroupManager.kt @@ -15,7 +15,7 @@ import java.util.concurrent.Executors object OpenGroupManager { private val executorService = Executors.newScheduledThreadPool(4) - private var pollers = mutableMapOf() // One for each server + private val pollers = mutableMapOf() // One for each server private var isPolling = false private val pollUpdaterLock = Any() @@ -44,9 +44,7 @@ object OpenGroupManager { synchronized(pollUpdaterLock) { servers.forEach { server -> pollers[server]?.stop() // Shouldn't be necessary - val poller = OpenGroupPoller(server, executorService) - poller.startIfNeeded() - pollers[server] = poller + pollers[server] = OpenGroupPoller(server, executorService).apply { startIfNeeded() } } } } diff --git a/libsession/src/main/java/org/session/libsession/messaging/open_groups/OpenGroupApi.kt b/libsession/src/main/java/org/session/libsession/messaging/open_groups/OpenGroupApi.kt index a05addcee2..dc6d1475f8 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/open_groups/OpenGroupApi.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/open_groups/OpenGroupApi.kt @@ -110,24 +110,22 @@ object OpenGroupApi { val upload: Boolean = false, val defaultUpload: Boolean = false, ) { - fun toPollInfo(): RoomPollInfo { - return RoomPollInfo( - token = token, - activeUsers = activeUsers, - admin = admin, - globalAdmin = globalAdmin, - moderator = moderator, - globalModerator = globalModerator, - read = read, - defaultRead = defaultRead, - defaultAccessible = defaultAccessible, - write = write, - defaultWrite = defaultWrite, - upload = upload, - defaultUpload = defaultUpload, - details = this - ) - } + fun toPollInfo() = RoomPollInfo( + token = token, + activeUsers = activeUsers, + admin = admin, + globalAdmin = globalAdmin, + moderator = moderator, + globalModerator = globalModerator, + read = read, + defaultRead = defaultRead, + defaultAccessible = defaultAccessible, + write = write, + defaultWrite = defaultWrite, + upload = upload, + defaultUpload = defaultUpload, + details = this + ) } @JsonNaming(PropertyNamingStrategy.SnakeCaseStrategy::class) diff --git a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPoller.kt b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPoller.kt index fb7b3b6c8a..387381c9cc 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPoller.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPoller.kt @@ -40,7 +40,7 @@ class OpenGroupPoller(private val server: String, private val executorService: S var isCaughtUp = false var secondToLastJob: MessageReceiveJob? = null private var future: ScheduledFuture<*>? = null - private var runId: UUID = UUID.randomUUID() + @Volatile private var runId: UUID = UUID.randomUUID() companion object { private const val pollInterval: Long = 4000L @@ -59,16 +59,20 @@ class OpenGroupPoller(private val server: String, private val executorService: S // If we don't have an existing group and don't have a 'createGroupIfMissingWithPublicKey' // value then don't process the poll info - val publicKey = ((existingOpenGroup?.publicKey ?: createGroupIfMissingWithPublicKey) ?: return) + val publicKey = existingOpenGroup?.publicKey ?: createGroupIfMissingWithPublicKey + val name = pollInfo.details?.name ?: existingOpenGroup?.name + val infoUpdates = pollInfo.details?.infoUpdates ?: existingOpenGroup?.infoUpdates + + if (publicKey == null) return val openGroup = OpenGroup( server = server, room = pollInfo.token, - name = ((pollInfo.details?.name ?: existingOpenGroup?.name) ?: ""), + name = name ?: "", publicKey = publicKey, imageId = (pollInfo.details?.imageId ?: existingOpenGroup?.imageId), canWrite = pollInfo.write, - infoUpdates = ((pollInfo.details?.infoUpdates ?: existingOpenGroup?.infoUpdates) ?: 0) + infoUpdates = infoUpdates ?: 0 ) // - Open Group changes storage.updateOpenGroup(openGroup)