Merge pull request #1241 from mpretty-cyro/fix/open-group-polling-bugs

Fixed a few issues with the OpenGroupPoller
This commit is contained in:
Morgan Pretty 2023-06-08 16:53:40 +10:00 committed by GitHub
commit a957e78aac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 135 additions and 101 deletions

View File

@ -15,7 +15,7 @@ import java.util.concurrent.Executors
object OpenGroupManager {
private val executorService = Executors.newScheduledThreadPool(4)
private var pollers = mutableMapOf<String, OpenGroupPoller>() // One for each server
private val pollers = mutableMapOf<String, OpenGroupPoller>() // One for each server
private var isPolling = false
private val pollUpdaterLock = Any()
@ -41,11 +41,11 @@ object OpenGroupManager {
isPolling = true
val storage = MessagingModuleConfiguration.shared.storage
val servers = storage.getAllOpenGroups().values.map { it.server }.toSet()
servers.forEach { server ->
pollers[server]?.stop() // Shouldn't be necessary
val poller = OpenGroupPoller(server, executorService)
poller.startIfNeeded()
pollers[server] = poller
synchronized(pollUpdaterLock) {
servers.forEach { server ->
pollers[server]?.stop() // Shouldn't be necessary
pollers[server] = OpenGroupPoller(server, executorService).apply { startIfNeeded() }
}
}
}
@ -60,7 +60,7 @@ object OpenGroupManager {
@WorkerThread
fun add(server: String, room: String, publicKey: String, context: Context): OpenGroupApi.RoomInfo? {
val openGroupID = "$server.$room"
var threadID = GroupManager.getOpenGroupThreadID(openGroupID, context)
val threadID = GroupManager.getOpenGroupThreadID(openGroupID, context)
val storage = MessagingModuleConfiguration.shared.storage
val threadDB = DatabaseComponent.get(context).lokiThreadDatabase()
// Check it it's added already
@ -76,13 +76,16 @@ object OpenGroupManager {
// Get capabilities & room info
val (capabilities, info) = OpenGroupApi.getCapabilitiesAndRoomInfo(room, server).get()
storage.setServerCapabilities(server, capabilities.capabilities)
storage.setUserCount(room, server, info.activeUsers)
// Create the group locally if not available already
if (threadID < 0) {
threadID = GroupManager.createOpenGroup(openGroupID, context, null, info.name).threadId
GroupManager.createOpenGroup(openGroupID, context, null, info.name)
}
val openGroup = OpenGroup(server = server, room = room, publicKey = publicKey, name = info.name, imageId = info.imageId, canWrite = info.write, infoUpdates = info.infoUpdates)
threadDB.setOpenGroupChat(openGroup, threadID)
OpenGroupPoller.handleRoomPollInfo(
server = server,
roomToken = room,
pollInfo = info.toPollInfo(),
createGroupIfMissingWithPublicKey = publicKey
)
return info
}

View File

@ -39,13 +39,7 @@ class BackgroundGroupAddJob(val joinUrl: String): Job {
delegate?.handleJobFailed(this, dispatcherName, DuplicateGroupException())
return
}
// get image
storage.setOpenGroupPublicKey(openGroup.server, openGroup.serverPublicKey)
val info = storage.addOpenGroup(openGroup.joinUrl())
val imageId = info?.imageId
if (imageId != null && storage.getGroupAvatarDownloadJob(openGroup.server, openGroup.room, imageId) == null) {
JobQueue.shared.add(GroupAvatarDownloadJob(openGroup.server, openGroup.room, imageId))
}
storage.addOpenGroup(openGroup.joinUrl())
Log.d(KEY, "onOpenGroupAdded(${openGroup.server})")
storage.onOpenGroupAdded(openGroup.server)
} catch (e: Exception) {

View File

@ -109,7 +109,24 @@ object OpenGroupApi {
val defaultWrite: Boolean = false,
val upload: Boolean = false,
val defaultUpload: Boolean = false,
)
) {
fun toPollInfo() = RoomPollInfo(
token = token,
activeUsers = activeUsers,
admin = admin,
globalAdmin = globalAdmin,
moderator = moderator,
globalModerator = globalModerator,
read = read,
defaultRead = defaultRead,
defaultAccessible = defaultAccessible,
write = write,
defaultWrite = defaultWrite,
upload = upload,
defaultUpload = defaultUpload,
details = this
)
}
@JsonNaming(PropertyNamingStrategy.SnakeCaseStrategy::class)
data class PinnedMessage(

View File

@ -30,6 +30,7 @@ import org.session.libsignal.protos.SignalServiceProtos
import org.session.libsignal.utilities.Base64
import org.session.libsignal.utilities.Log
import org.session.libsignal.utilities.successBackground
import java.util.UUID
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.ScheduledFuture
import java.util.concurrent.TimeUnit
@ -39,15 +40,101 @@ class OpenGroupPoller(private val server: String, private val executorService: S
var isCaughtUp = false
var secondToLastJob: MessageReceiveJob? = null
private var future: ScheduledFuture<*>? = null
@Volatile private var runId: UUID = UUID.randomUUID()
companion object {
private const val pollInterval: Long = 4000L
const val maxInactivityPeriod = 14 * 24 * 60 * 60 * 1000
public fun handleRoomPollInfo(
server: String,
roomToken: String,
pollInfo: OpenGroupApi.RoomPollInfo,
createGroupIfMissingWithPublicKey: String? = null
) {
val storage = MessagingModuleConfiguration.shared.storage
val groupId = "$server.$roomToken"
val dbGroupId = GroupUtil.getEncodedOpenGroupID(groupId.toByteArray())
val existingOpenGroup = storage.getOpenGroup(roomToken, server)
// If we don't have an existing group and don't have a 'createGroupIfMissingWithPublicKey'
// value then don't process the poll info
val publicKey = existingOpenGroup?.publicKey ?: createGroupIfMissingWithPublicKey
val name = pollInfo.details?.name ?: existingOpenGroup?.name
val infoUpdates = pollInfo.details?.infoUpdates ?: existingOpenGroup?.infoUpdates
if (publicKey == null) return
val openGroup = OpenGroup(
server = server,
room = pollInfo.token,
name = name ?: "",
publicKey = publicKey,
imageId = (pollInfo.details?.imageId ?: existingOpenGroup?.imageId),
canWrite = pollInfo.write,
infoUpdates = infoUpdates ?: 0
)
// - Open Group changes
storage.updateOpenGroup(openGroup)
// - User Count
storage.setUserCount(roomToken, server, pollInfo.activeUsers)
// - Moderators
pollInfo.details?.moderators?.let { moderatorList ->
storage.setGroupMemberRoles(moderatorList.map {
GroupMember(groupId, it, GroupMemberRole.MODERATOR)
})
}
pollInfo.details?.hiddenModerators?.let { moderatorList ->
storage.setGroupMemberRoles(moderatorList.map {
GroupMember(groupId, it, GroupMemberRole.HIDDEN_MODERATOR)
})
}
// - Admins
pollInfo.details?.admins?.let { moderatorList ->
storage.setGroupMemberRoles(moderatorList.map {
GroupMember(groupId, it, GroupMemberRole.ADMIN)
})
}
pollInfo.details?.hiddenAdmins?.let { moderatorList ->
storage.setGroupMemberRoles(moderatorList.map {
GroupMember(groupId, it, GroupMemberRole.HIDDEN_ADMIN)
})
}
// Update the group avatar
if (
(
pollInfo.details != null &&
pollInfo.details.imageId != null && (
pollInfo.details.imageId != existingOpenGroup?.imageId ||
!storage.hasDownloadedProfilePicture(dbGroupId)
) &&
storage.getGroupAvatarDownloadJob(openGroup.server, openGroup.room, pollInfo.details.imageId) == null
) || (
pollInfo.details == null &&
existingOpenGroup?.imageId != null &&
!storage.hasDownloadedProfilePicture(dbGroupId) &&
storage.getGroupAvatarDownloadJob(openGroup.server, openGroup.room, existingOpenGroup.imageId) == null
)
) {
JobQueue.shared.add(GroupAvatarDownloadJob(server, roomToken, openGroup.imageId))
}
else if (
pollInfo.details != null &&
pollInfo.details.imageId == null &&
existingOpenGroup?.imageId != null
) {
storage.removeProfilePicture(dbGroupId)
}
}
}
fun startIfNeeded() {
if (hasStarted) { return }
hasStarted = true
runId = UUID.randomUUID()
future = executorService?.schedule(::poll, 0, TimeUnit.MILLISECONDS)
}
@ -57,6 +144,7 @@ class OpenGroupPoller(private val server: String, private val executorService: S
}
fun poll(isPostCapabilitiesRetry: Boolean = false): Promise<Unit, Exception> {
val currentRunId = runId
val storage = MessagingModuleConfiguration.shared.storage
val rooms = storage.getAllOpenGroups().values.filter { it.server == server }.map { it.room }
@ -86,22 +174,30 @@ class OpenGroupPoller(private val server: String, private val executorService: S
isCaughtUp = true
}
}
executorService?.schedule(this@OpenGroupPoller::poll, pollInterval, TimeUnit.MILLISECONDS)
// Only poll again if it's the same poller run
if (currentRunId == runId) {
future = executorService?.schedule(this@OpenGroupPoller::poll, pollInterval, TimeUnit.MILLISECONDS)
}
}.fail {
updateCapabilitiesIfNeeded(isPostCapabilitiesRetry, it)
updateCapabilitiesIfNeeded(isPostCapabilitiesRetry, currentRunId, it)
}.map { }
}
private fun updateCapabilitiesIfNeeded(isPostCapabilitiesRetry: Boolean, exception: Exception) {
private fun updateCapabilitiesIfNeeded(isPostCapabilitiesRetry: Boolean, currentRunId: UUID, exception: Exception) {
if (exception is OnionRequestAPI.HTTPRequestFailedBlindingRequiredException) {
if (!isPostCapabilitiesRetry) {
OpenGroupApi.getCapabilities(server).map {
handleCapabilities(server, it)
}
executorService?.schedule({ poll(isPostCapabilitiesRetry = true) }, pollInterval, TimeUnit.MILLISECONDS)
// Only poll again if it's the same poller run
if (currentRunId == runId) {
future = executorService?.schedule({ poll(isPostCapabilitiesRetry = true) }, pollInterval, TimeUnit.MILLISECONDS)
}
}
} else {
executorService?.schedule(this@OpenGroupPoller::poll, pollInterval, TimeUnit.MILLISECONDS)
} else if (currentRunId == runId) {
future = executorService?.schedule(this@OpenGroupPoller::poll, pollInterval, TimeUnit.MILLISECONDS)
}
}
@ -110,82 +206,6 @@ class OpenGroupPoller(private val server: String, private val executorService: S
storage.setServerCapabilities(server, capabilities.capabilities)
}
private fun handleRoomPollInfo(
server: String,
roomToken: String,
pollInfo: OpenGroupApi.RoomPollInfo
) {
val storage = MessagingModuleConfiguration.shared.storage
val groupId = "$server.$roomToken"
val dbGroupId = GroupUtil.getEncodedOpenGroupID(groupId.toByteArray())
val existingOpenGroup = storage.getOpenGroup(roomToken, server)
val publicKey = existingOpenGroup?.publicKey ?: return
val openGroup = OpenGroup(
server = server,
room = pollInfo.token,
name = if (pollInfo.details != null) { pollInfo.details.name } else { existingOpenGroup.name },
publicKey = publicKey,
imageId = if (pollInfo.details != null) { pollInfo.details.imageId } else { existingOpenGroup.imageId },
canWrite = pollInfo.write,
infoUpdates = if (pollInfo.details != null) { pollInfo.details.infoUpdates } else { existingOpenGroup.infoUpdates }
)
// - Open Group changes
storage.updateOpenGroup(openGroup)
// - User Count
storage.setUserCount(roomToken, server, pollInfo.activeUsers)
// - Moderators
pollInfo.details?.moderators?.let { moderatorList ->
storage.setGroupMemberRoles(moderatorList.map {
GroupMember(groupId, it, GroupMemberRole.MODERATOR)
})
}
pollInfo.details?.hiddenModerators?.let { moderatorList ->
storage.setGroupMemberRoles(moderatorList.map {
GroupMember(groupId, it, GroupMemberRole.HIDDEN_MODERATOR)
})
}
// - Admins
pollInfo.details?.admins?.let { moderatorList ->
storage.setGroupMemberRoles(moderatorList.map {
GroupMember(groupId, it, GroupMemberRole.ADMIN)
})
}
pollInfo.details?.hiddenAdmins?.let { moderatorList ->
storage.setGroupMemberRoles(moderatorList.map {
GroupMember(groupId, it, GroupMemberRole.HIDDEN_ADMIN)
})
}
// Update the group avatar
if (
(
pollInfo.details != null &&
pollInfo.details.imageId != null && (
pollInfo.details.imageId != existingOpenGroup.imageId ||
!storage.hasDownloadedProfilePicture(dbGroupId)
) &&
storage.getGroupAvatarDownloadJob(openGroup.server, openGroup.room, pollInfo.details.imageId) == null
) || (
pollInfo.details == null &&
existingOpenGroup.imageId != null &&
!storage.hasDownloadedProfilePicture(dbGroupId) &&
storage.getGroupAvatarDownloadJob(openGroup.server, openGroup.room, existingOpenGroup.imageId) == null
)
) {
JobQueue.shared.add(GroupAvatarDownloadJob(server, roomToken, existingOpenGroup.imageId))
}
else if (
pollInfo.details != null &&
pollInfo.details.imageId == null &&
existingOpenGroup.imageId != null
) {
storage.removeProfilePicture(dbGroupId)
}
}
private fun handleMessages(
server: String,
roomToken: String,