diff --git a/app/src/main/java/org/thoughtcrime/securesms/loki/api/PublicChatManager.kt b/app/src/main/java/org/thoughtcrime/securesms/loki/api/PublicChatManager.kt index 92c3ded6d4..e93a9b6df8 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/loki/api/PublicChatManager.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/loki/api/PublicChatManager.kt @@ -54,7 +54,7 @@ class PublicChatManager(private val context: Context) { if (!pollers.containsKey(threadId)) { pollers[threadId] = poller } } for ((threadId, chat) in v2Chats) { - val poller = v2Pollers[threadId] ?: OpenGroupV2Poller(chat, executorService) + val poller = v2Pollers[threadId] ?: OpenGroupV2Poller(listOf(chat), executorService) poller.startIfNeeded() listenToThreadDeletion(threadId) if (!v2Pollers.containsKey(threadId)) { v2Pollers[threadId] = poller } diff --git a/libsession/src/main/java/org/session/libsession/messaging/open_groups/OpenGroupAPIV2.kt b/libsession/src/main/java/org/session/libsession/messaging/open_groups/OpenGroupAPIV2.kt index c0f5a62786..0c67cebc98 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/open_groups/OpenGroupAPIV2.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/open_groups/OpenGroupAPIV2.kt @@ -323,13 +323,17 @@ object OpenGroupAPIV2 { // endregion // region Moderation + private fun handleModerators(serverRoomId: String, moderatorList: List) { + moderators[serverRoomId] = moderatorList.toMutableSet() + } + fun getModerators(room: String, server: String): Promise, Exception> { val request = Request(verb = GET, room = room, server = server, endpoint = "moderators") return send(request).map(sharedContext) { json -> @Suppress("UNCHECKED_CAST") val moderatorsJson = json["moderators"] as? List ?: throw Error.PARSING_FAILED val id = "$server.$room" - moderators[id] = moderatorsJson.toMutableSet() + handleModerators(id, moderatorsJson) moderatorsJson } } @@ -356,6 +360,7 @@ object OpenGroupAPIV2 { // endregion // region General + @Suppress("UNCHECKED_CAST") fun getCompactPoll(rooms: List, server: String): Promise, Exception> { val requestAuths = rooms.associateWith { room -> getAuthToken(room, server) } val storage = MessagingModuleConfiguration.shared.storage @@ -376,8 +381,51 @@ object OpenGroupAPIV2 { val request = Request(verb = POST, room = null, server = server, endpoint = "compact_poll", isAuthRequired = false, parameters = mapOf("requests" to requests)) // build a request for all rooms return send(request = request).map(sharedContext) { json -> - val results = json["results"] as? Map<*, *> ?: throw Error.PARSING_FAILED - TODO() + val results = json["results"] as? List<*> ?: throw Error.PARSING_FAILED + + results.mapNotNull { roomJson -> + if (roomJson !is Map<*,*>) return@mapNotNull null + val roomId = roomJson["room_id"] as? String ?: return@mapNotNull null + + // check the status was fine + val statusCode = roomJson["status_code"] as? Int ?: return@mapNotNull null + if (statusCode == 401) { + // delete auth token and return null + storage.removeAuthToken(roomId, server) + } + + // check and store mods + val moderators = roomJson["moderators"] as? List ?: return@mapNotNull null + handleModerators("$server.$roomId", moderators) + + // get deletions + val type = TypeFactory.defaultInstance().constructCollectionType(List::class.java, MessageDeletion::class.java) + val idsAsString = JsonUtil.toJson(roomJson["deletions"]) + val deletedServerIDs = JsonUtil.fromJson>(idsAsString, type) ?: throw Error.PARSING_FAILED + val lastDeletionServerId = storage.getLastDeletionServerId(roomId, server) ?: 0 + val serverID = deletedServerIDs.maxByOrNull {it.id } ?: MessageDeletion.EMPTY + if (serverID.id > lastDeletionServerId) { + storage.setLastDeletionServerId(roomId, server, serverID.id) + } + + // get messages + val rawMessages = roomJson["messages"] as? List> ?: return@mapNotNull null // parsing failed + + val lastMessageServerId = storage.getLastMessageServerId(roomId, server) ?: 0 + var currentMax = lastMessageServerId + val messages = rawMessages.mapNotNull { rawMessage -> + val message = OpenGroupMessageV2.fromJSON(rawMessage)?.apply { + currentMax = maxOf(currentMax,this.serverID ?: 0) + } + message + } + storage.setLastMessageServerId(roomId, server, currentMax) + roomId to CompactPollResult( + messages = messages, + deletions = deletedServerIDs.map { it.deletedMessageId }, + moderators = moderators + ) + }.toMap() } } diff --git a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/ReceivedMessageHandler.kt b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/ReceivedMessageHandler.kt index 9e4ad9ecec..c42c5e3ec7 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/ReceivedMessageHandler.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/ReceivedMessageHandler.kt @@ -222,7 +222,7 @@ fun MessageReceiver.handleVisibleMessage(message: VisibleMessage, proto: SignalS // Parse stickers if needed // Persist the message message.threadID = threadID - val messageID = storage.persist(message, quoteModel, linkPreviews, message.groupPublicKey, openGroupID, attachments) ?: throw MessageReceiver.Error.NoThread + val messageID = storage.persist(message, quoteModel, linkPreviews, message.groupPublicKey, openGroupID, attachments) ?: throw MessageReceiver.Error.DuplicateMessage // Parse & persist attachments // Start attachment downloads if needed storage.getAttachmentsForMessage(messageID).forEach { attachment -> diff --git a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupV2Poller.kt b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupV2Poller.kt index 3eb07b78ea..45dd183c21 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupV2Poller.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupV2Poller.kt @@ -1,23 +1,22 @@ package org.session.libsession.messaging.sending_receiving.pollers import nl.komponents.kovenant.Promise -import nl.komponents.kovenant.deferred import org.session.libsession.messaging.MessagingModuleConfiguration import org.session.libsession.messaging.jobs.JobQueue import org.session.libsession.messaging.jobs.MessageReceiveJob import org.session.libsession.messaging.open_groups.OpenGroupAPIV2 +import org.session.libsession.messaging.open_groups.OpenGroupMessageV2 import org.session.libsession.messaging.open_groups.OpenGroupV2 import org.session.libsession.messaging.threads.Address import org.session.libsession.utilities.GroupUtil import org.session.libsignal.service.internal.push.SignalServiceProtos -import org.session.libsignal.service.loki.database.LokiMessageDatabaseProtocol import org.session.libsignal.utilities.logging.Log import org.session.libsignal.utilities.successBackground import java.util.concurrent.ScheduledExecutorService import java.util.concurrent.ScheduledFuture import java.util.concurrent.TimeUnit -class OpenGroupV2Poller(private val openGroup: OpenGroupV2, private val executorService: ScheduledExecutorService? = null) { +class OpenGroupV2Poller(private val openGroups: List, private val executorService: ScheduledExecutorService? = null) { private var hasStarted = false @Volatile private var isPollOngoing = false @@ -25,22 +24,29 @@ class OpenGroupV2Poller(private val openGroup: OpenGroupV2, private val executor private val cancellableFutures = mutableListOf>() + // use this as a receive time-based window to calculate re-poll interval + private val receivedQueue = ArrayDeque(50) + + private fun calculatePollInterval(): Long { + // sample last default poll time * 2 + while (receivedQueue.size > 50) { + receivedQueue.removeLast() + } + val sampleWindow = System.currentTimeMillis() - pollForNewMessagesInterval * 2 + val numberInSample = receivedQueue.toList().filter { it > sampleWindow }.size.coerceAtLeast(1) + return ((2 + (50 / numberInSample / 20)*5) * 1000).toLong() + } + // region Settings companion object { private val pollForNewMessagesInterval: Long = 10 * 1000 - private val pollForDeletedMessagesInterval: Long = 60 * 1000 - private val pollForModeratorsInterval: Long = 10 * 60 * 1000 } // endregion // region Lifecycle fun startIfNeeded() { if (hasStarted || executorService == null) return - cancellableFutures += listOf( - executorService.scheduleAtFixedRate(::pollForNewMessages,0, pollForNewMessagesInterval, TimeUnit.MILLISECONDS), - executorService.scheduleAtFixedRate(::pollForDeletedMessages,0, pollForDeletedMessagesInterval, TimeUnit.MILLISECONDS), - executorService.scheduleAtFixedRate(::pollForModerators,0, pollForModeratorsInterval, TimeUnit.MILLISECONDS), - ) + cancellableFutures += executorService.schedule(::compactPoll, 0, TimeUnit.MILLISECONDS) hasStarted = true } @@ -54,75 +60,75 @@ class OpenGroupV2Poller(private val openGroup: OpenGroupV2, private val executor // endregion // region Polling - fun pollForNewMessages(): Promise { - return pollForNewMessages(false) + + private fun compactPoll(): Promise { + return compactPoll(false) } - private fun pollForNewMessages(isBackgroundPoll: Boolean): Promise { - if (isPollOngoing) { return Promise.of(Unit) } + fun compactPoll(isBackgroundPoll: Boolean): Promise { + if (isPollOngoing) return Promise.of(Unit) isPollOngoing = true - val deferred = deferred() - // Kovenant propagates a context to chained promises, so OpenGroupAPI.sharedContext should be used for all of the below - OpenGroupAPIV2.getMessages(openGroup.room, openGroup.server).successBackground { messages -> - // Process messages in the background - messages.forEach { message -> - try { - val senderPublicKey = message.sender!! - // Main message - val dataMessageProto = message.toProto() - // Content - val content = SignalServiceProtos.Content.newBuilder() - content.dataMessage = dataMessageProto - // Envelope - val builder = SignalServiceProtos.Envelope.newBuilder() - builder.type = SignalServiceProtos.Envelope.Type.SESSION_MESSAGE - builder.source = senderPublicKey - builder.sourceDevice = 1 - builder.content = content.build().toByteString() - builder.timestamp = message.sentTimestamp - val envelope = builder.build() - val job = MessageReceiveJob(envelope.toByteArray(), isBackgroundPoll, message.serverID, openGroup.id) - Log.d("Loki", "Scheduling Job $job") - if (isBackgroundPoll) { - job.executeAsync().always { deferred.resolve(Unit) } - // The promise is just used to keep track of when we're done - } else { - JobQueue.shared.add(job) - } - } catch (e: Exception) { - Log.e("Loki", "Exception parsing message", e) - } + val server = openGroups.first().server // assume all the same server + val rooms = openGroups.map { it.room } + return OpenGroupAPIV2.getCompactPoll(rooms = rooms, server).successBackground { results -> + results.forEach { (room, results) -> + val serverRoomId = "$server.$room" + handleDeletedMessages(serverRoomId,results.deletions) + handleNewMessages(serverRoomId, results.messages, isBackgroundPoll) } - isCaughtUp = true - isPollOngoing = false - deferred.resolve(Unit) - }.fail { - Log.e("Loki", "Failed to get messages for group chat with room: ${openGroup.room} on server: ${openGroup.server}.", it) + }.always { isPollOngoing = false + if (!isBackgroundPoll) { + val delay = calculatePollInterval() + Log.d("Loki", "polling in ${delay}ms") + executorService?.schedule(this@OpenGroupV2Poller::compactPoll, delay, TimeUnit.MILLISECONDS) + } } - return deferred.promise } - private fun pollForDeletedMessages() { + private fun handleNewMessages(serverRoomId: String, newMessages: List, isBackgroundPoll: Boolean) { + newMessages.forEach { message -> + try { + val senderPublicKey = message.sender!! + // Main message + val dataMessageProto = message.toProto() + // Content + val content = SignalServiceProtos.Content.newBuilder() + content.dataMessage = dataMessageProto + // Envelope + val builder = SignalServiceProtos.Envelope.newBuilder() + builder.type = SignalServiceProtos.Envelope.Type.SESSION_MESSAGE + builder.source = senderPublicKey + builder.sourceDevice = 1 + builder.content = content.build().toByteString() + builder.timestamp = message.sentTimestamp + val envelope = builder.build() + val job = MessageReceiveJob(envelope.toByteArray(), isBackgroundPoll, message.serverID, serverRoomId) + Log.d("Loki", "Scheduling Job $job") + if (isBackgroundPoll) { + job.executeAsync() + // The promise is just used to keep track of when we're done + } else { + JobQueue.shared.add(job) + } + receivedQueue.addFirst(message.sentTimestamp) + } catch (e: Exception) { + Log.e("Loki", "Exception parsing message", e) + } + } + } + + private fun handleDeletedMessages(serverRoomId: String, deletedMessageServerIDs: List) { val messagingModule = MessagingModuleConfiguration.shared - val address = GroupUtil.getEncodedOpenGroupID(openGroup.id.toByteArray()) + val address = GroupUtil.getEncodedOpenGroupID(serverRoomId.toByteArray()) val threadId = messagingModule.storage.getThreadIdFor(Address.fromSerialized(address)) ?: return - OpenGroupAPIV2.getDeletedMessages(openGroup.room, openGroup.server).success { deletedMessageServerIDs -> - - val deletedMessageIDs = deletedMessageServerIDs.mapNotNull { serverId -> - messagingModule.messageDataProvider.getMessageID(serverId.deletedMessageId, threadId) - } - deletedMessageIDs.forEach { (messageId, isSms) -> - MessagingModuleConfiguration.shared.messageDataProvider.deleteMessage(messageId, isSms) - } - }.fail { - Log.d("Loki", "Failed to get deleted messages for group chat with ID: ${openGroup.room} on server: ${openGroup.server}.") + val deletedMessageIDs = deletedMessageServerIDs.mapNotNull { serverId -> + messagingModule.messageDataProvider.getMessageID(serverId, threadId) + } + deletedMessageIDs.forEach { (messageId, isSms) -> + MessagingModuleConfiguration.shared.messageDataProvider.deleteMessage(messageId, isSms) } - } - - private fun pollForModerators() { - OpenGroupAPIV2.getModerators(openGroup.room, openGroup.server) } // endregion } \ No newline at end of file