mirror of
https://github.com/oxen-io/session-android.git
synced 2025-02-17 14:38:26 +00:00
refactor: compact polling and usage based polling
This commit is contained in:
parent
460babe930
commit
7e14a315b9
@ -54,7 +54,7 @@ class PublicChatManager(private val context: Context) {
|
|||||||
if (!pollers.containsKey(threadId)) { pollers[threadId] = poller }
|
if (!pollers.containsKey(threadId)) { pollers[threadId] = poller }
|
||||||
}
|
}
|
||||||
for ((threadId, chat) in v2Chats) {
|
for ((threadId, chat) in v2Chats) {
|
||||||
val poller = v2Pollers[threadId] ?: OpenGroupV2Poller(chat, executorService)
|
val poller = v2Pollers[threadId] ?: OpenGroupV2Poller(listOf(chat), executorService)
|
||||||
poller.startIfNeeded()
|
poller.startIfNeeded()
|
||||||
listenToThreadDeletion(threadId)
|
listenToThreadDeletion(threadId)
|
||||||
if (!v2Pollers.containsKey(threadId)) { v2Pollers[threadId] = poller }
|
if (!v2Pollers.containsKey(threadId)) { v2Pollers[threadId] = poller }
|
||||||
|
@ -323,13 +323,17 @@ object OpenGroupAPIV2 {
|
|||||||
// endregion
|
// endregion
|
||||||
|
|
||||||
// region Moderation
|
// region Moderation
|
||||||
|
private fun handleModerators(serverRoomId: String, moderatorList: List<String>) {
|
||||||
|
moderators[serverRoomId] = moderatorList.toMutableSet()
|
||||||
|
}
|
||||||
|
|
||||||
fun getModerators(room: String, server: String): Promise<List<String>, Exception> {
|
fun getModerators(room: String, server: String): Promise<List<String>, Exception> {
|
||||||
val request = Request(verb = GET, room = room, server = server, endpoint = "moderators")
|
val request = Request(verb = GET, room = room, server = server, endpoint = "moderators")
|
||||||
return send(request).map(sharedContext) { json ->
|
return send(request).map(sharedContext) { json ->
|
||||||
@Suppress("UNCHECKED_CAST") val moderatorsJson = json["moderators"] as? List<String>
|
@Suppress("UNCHECKED_CAST") val moderatorsJson = json["moderators"] as? List<String>
|
||||||
?: throw Error.PARSING_FAILED
|
?: throw Error.PARSING_FAILED
|
||||||
val id = "$server.$room"
|
val id = "$server.$room"
|
||||||
moderators[id] = moderatorsJson.toMutableSet()
|
handleModerators(id, moderatorsJson)
|
||||||
moderatorsJson
|
moderatorsJson
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -356,6 +360,7 @@ object OpenGroupAPIV2 {
|
|||||||
// endregion
|
// endregion
|
||||||
|
|
||||||
// region General
|
// region General
|
||||||
|
@Suppress("UNCHECKED_CAST")
|
||||||
fun getCompactPoll(rooms: List<String>, server: String): Promise<Map<String, CompactPollResult>, Exception> {
|
fun getCompactPoll(rooms: List<String>, server: String): Promise<Map<String, CompactPollResult>, Exception> {
|
||||||
val requestAuths = rooms.associateWith { room -> getAuthToken(room, server) }
|
val requestAuths = rooms.associateWith { room -> getAuthToken(room, server) }
|
||||||
val storage = MessagingModuleConfiguration.shared.storage
|
val storage = MessagingModuleConfiguration.shared.storage
|
||||||
@ -376,8 +381,51 @@ object OpenGroupAPIV2 {
|
|||||||
val request = Request(verb = POST, room = null, server = server, endpoint = "compact_poll", isAuthRequired = false, parameters = mapOf("requests" to requests))
|
val request = Request(verb = POST, room = null, server = server, endpoint = "compact_poll", isAuthRequired = false, parameters = mapOf("requests" to requests))
|
||||||
// build a request for all rooms
|
// build a request for all rooms
|
||||||
return send(request = request).map(sharedContext) { json ->
|
return send(request = request).map(sharedContext) { json ->
|
||||||
val results = json["results"] as? Map<*, *> ?: throw Error.PARSING_FAILED
|
val results = json["results"] as? List<*> ?: throw Error.PARSING_FAILED
|
||||||
TODO()
|
|
||||||
|
results.mapNotNull { roomJson ->
|
||||||
|
if (roomJson !is Map<*,*>) return@mapNotNull null
|
||||||
|
val roomId = roomJson["room_id"] as? String ?: return@mapNotNull null
|
||||||
|
|
||||||
|
// check the status was fine
|
||||||
|
val statusCode = roomJson["status_code"] as? Int ?: return@mapNotNull null
|
||||||
|
if (statusCode == 401) {
|
||||||
|
// delete auth token and return null
|
||||||
|
storage.removeAuthToken(roomId, server)
|
||||||
|
}
|
||||||
|
|
||||||
|
// check and store mods
|
||||||
|
val moderators = roomJson["moderators"] as? List<String> ?: return@mapNotNull null
|
||||||
|
handleModerators("$server.$roomId", moderators)
|
||||||
|
|
||||||
|
// get deletions
|
||||||
|
val type = TypeFactory.defaultInstance().constructCollectionType(List::class.java, MessageDeletion::class.java)
|
||||||
|
val idsAsString = JsonUtil.toJson(roomJson["deletions"])
|
||||||
|
val deletedServerIDs = JsonUtil.fromJson<List<MessageDeletion>>(idsAsString, type) ?: throw Error.PARSING_FAILED
|
||||||
|
val lastDeletionServerId = storage.getLastDeletionServerId(roomId, server) ?: 0
|
||||||
|
val serverID = deletedServerIDs.maxByOrNull {it.id } ?: MessageDeletion.EMPTY
|
||||||
|
if (serverID.id > lastDeletionServerId) {
|
||||||
|
storage.setLastDeletionServerId(roomId, server, serverID.id)
|
||||||
|
}
|
||||||
|
|
||||||
|
// get messages
|
||||||
|
val rawMessages = roomJson["messages"] as? List<Map<String, Any>> ?: return@mapNotNull null // parsing failed
|
||||||
|
|
||||||
|
val lastMessageServerId = storage.getLastMessageServerId(roomId, server) ?: 0
|
||||||
|
var currentMax = lastMessageServerId
|
||||||
|
val messages = rawMessages.mapNotNull { rawMessage ->
|
||||||
|
val message = OpenGroupMessageV2.fromJSON(rawMessage)?.apply {
|
||||||
|
currentMax = maxOf(currentMax,this.serverID ?: 0)
|
||||||
|
}
|
||||||
|
message
|
||||||
|
}
|
||||||
|
storage.setLastMessageServerId(roomId, server, currentMax)
|
||||||
|
roomId to CompactPollResult(
|
||||||
|
messages = messages,
|
||||||
|
deletions = deletedServerIDs.map { it.deletedMessageId },
|
||||||
|
moderators = moderators
|
||||||
|
)
|
||||||
|
}.toMap()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -222,7 +222,7 @@ fun MessageReceiver.handleVisibleMessage(message: VisibleMessage, proto: SignalS
|
|||||||
// Parse stickers if needed
|
// Parse stickers if needed
|
||||||
// Persist the message
|
// Persist the message
|
||||||
message.threadID = threadID
|
message.threadID = threadID
|
||||||
val messageID = storage.persist(message, quoteModel, linkPreviews, message.groupPublicKey, openGroupID, attachments) ?: throw MessageReceiver.Error.NoThread
|
val messageID = storage.persist(message, quoteModel, linkPreviews, message.groupPublicKey, openGroupID, attachments) ?: throw MessageReceiver.Error.DuplicateMessage
|
||||||
// Parse & persist attachments
|
// Parse & persist attachments
|
||||||
// Start attachment downloads if needed
|
// Start attachment downloads if needed
|
||||||
storage.getAttachmentsForMessage(messageID).forEach { attachment ->
|
storage.getAttachmentsForMessage(messageID).forEach { attachment ->
|
||||||
|
@ -1,23 +1,22 @@
|
|||||||
package org.session.libsession.messaging.sending_receiving.pollers
|
package org.session.libsession.messaging.sending_receiving.pollers
|
||||||
|
|
||||||
import nl.komponents.kovenant.Promise
|
import nl.komponents.kovenant.Promise
|
||||||
import nl.komponents.kovenant.deferred
|
|
||||||
import org.session.libsession.messaging.MessagingModuleConfiguration
|
import org.session.libsession.messaging.MessagingModuleConfiguration
|
||||||
import org.session.libsession.messaging.jobs.JobQueue
|
import org.session.libsession.messaging.jobs.JobQueue
|
||||||
import org.session.libsession.messaging.jobs.MessageReceiveJob
|
import org.session.libsession.messaging.jobs.MessageReceiveJob
|
||||||
import org.session.libsession.messaging.open_groups.OpenGroupAPIV2
|
import org.session.libsession.messaging.open_groups.OpenGroupAPIV2
|
||||||
|
import org.session.libsession.messaging.open_groups.OpenGroupMessageV2
|
||||||
import org.session.libsession.messaging.open_groups.OpenGroupV2
|
import org.session.libsession.messaging.open_groups.OpenGroupV2
|
||||||
import org.session.libsession.messaging.threads.Address
|
import org.session.libsession.messaging.threads.Address
|
||||||
import org.session.libsession.utilities.GroupUtil
|
import org.session.libsession.utilities.GroupUtil
|
||||||
import org.session.libsignal.service.internal.push.SignalServiceProtos
|
import org.session.libsignal.service.internal.push.SignalServiceProtos
|
||||||
import org.session.libsignal.service.loki.database.LokiMessageDatabaseProtocol
|
|
||||||
import org.session.libsignal.utilities.logging.Log
|
import org.session.libsignal.utilities.logging.Log
|
||||||
import org.session.libsignal.utilities.successBackground
|
import org.session.libsignal.utilities.successBackground
|
||||||
import java.util.concurrent.ScheduledExecutorService
|
import java.util.concurrent.ScheduledExecutorService
|
||||||
import java.util.concurrent.ScheduledFuture
|
import java.util.concurrent.ScheduledFuture
|
||||||
import java.util.concurrent.TimeUnit
|
import java.util.concurrent.TimeUnit
|
||||||
|
|
||||||
class OpenGroupV2Poller(private val openGroup: OpenGroupV2, private val executorService: ScheduledExecutorService? = null) {
|
class OpenGroupV2Poller(private val openGroups: List<OpenGroupV2>, private val executorService: ScheduledExecutorService? = null) {
|
||||||
|
|
||||||
private var hasStarted = false
|
private var hasStarted = false
|
||||||
@Volatile private var isPollOngoing = false
|
@Volatile private var isPollOngoing = false
|
||||||
@ -25,22 +24,29 @@ class OpenGroupV2Poller(private val openGroup: OpenGroupV2, private val executor
|
|||||||
|
|
||||||
private val cancellableFutures = mutableListOf<ScheduledFuture<out Any>>()
|
private val cancellableFutures = mutableListOf<ScheduledFuture<out Any>>()
|
||||||
|
|
||||||
|
// use this as a receive time-based window to calculate re-poll interval
|
||||||
|
private val receivedQueue = ArrayDeque<Long>(50)
|
||||||
|
|
||||||
|
private fun calculatePollInterval(): Long {
|
||||||
|
// sample last default poll time * 2
|
||||||
|
while (receivedQueue.size > 50) {
|
||||||
|
receivedQueue.removeLast()
|
||||||
|
}
|
||||||
|
val sampleWindow = System.currentTimeMillis() - pollForNewMessagesInterval * 2
|
||||||
|
val numberInSample = receivedQueue.toList().filter { it > sampleWindow }.size.coerceAtLeast(1)
|
||||||
|
return ((2 + (50 / numberInSample / 20)*5) * 1000).toLong()
|
||||||
|
}
|
||||||
|
|
||||||
// region Settings
|
// region Settings
|
||||||
companion object {
|
companion object {
|
||||||
private val pollForNewMessagesInterval: Long = 10 * 1000
|
private val pollForNewMessagesInterval: Long = 10 * 1000
|
||||||
private val pollForDeletedMessagesInterval: Long = 60 * 1000
|
|
||||||
private val pollForModeratorsInterval: Long = 10 * 60 * 1000
|
|
||||||
}
|
}
|
||||||
// endregion
|
// endregion
|
||||||
|
|
||||||
// region Lifecycle
|
// region Lifecycle
|
||||||
fun startIfNeeded() {
|
fun startIfNeeded() {
|
||||||
if (hasStarted || executorService == null) return
|
if (hasStarted || executorService == null) return
|
||||||
cancellableFutures += listOf(
|
cancellableFutures += executorService.schedule(::compactPoll, 0, TimeUnit.MILLISECONDS)
|
||||||
executorService.scheduleAtFixedRate(::pollForNewMessages,0, pollForNewMessagesInterval, TimeUnit.MILLISECONDS),
|
|
||||||
executorService.scheduleAtFixedRate(::pollForDeletedMessages,0, pollForDeletedMessagesInterval, TimeUnit.MILLISECONDS),
|
|
||||||
executorService.scheduleAtFixedRate(::pollForModerators,0, pollForModeratorsInterval, TimeUnit.MILLISECONDS),
|
|
||||||
)
|
|
||||||
hasStarted = true
|
hasStarted = true
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -54,75 +60,75 @@ class OpenGroupV2Poller(private val openGroup: OpenGroupV2, private val executor
|
|||||||
// endregion
|
// endregion
|
||||||
|
|
||||||
// region Polling
|
// region Polling
|
||||||
fun pollForNewMessages(): Promise<Unit, Exception> {
|
|
||||||
return pollForNewMessages(false)
|
private fun compactPoll(): Promise<Any, Exception> {
|
||||||
|
return compactPoll(false)
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun pollForNewMessages(isBackgroundPoll: Boolean): Promise<Unit, Exception> {
|
fun compactPoll(isBackgroundPoll: Boolean): Promise<Any, Exception> {
|
||||||
if (isPollOngoing) { return Promise.of(Unit) }
|
if (isPollOngoing) return Promise.of(Unit)
|
||||||
isPollOngoing = true
|
isPollOngoing = true
|
||||||
val deferred = deferred<Unit, Exception>()
|
val server = openGroups.first().server // assume all the same server
|
||||||
// Kovenant propagates a context to chained promises, so OpenGroupAPI.sharedContext should be used for all of the below
|
val rooms = openGroups.map { it.room }
|
||||||
OpenGroupAPIV2.getMessages(openGroup.room, openGroup.server).successBackground { messages ->
|
return OpenGroupAPIV2.getCompactPoll(rooms = rooms, server).successBackground { results ->
|
||||||
// Process messages in the background
|
results.forEach { (room, results) ->
|
||||||
messages.forEach { message ->
|
val serverRoomId = "$server.$room"
|
||||||
try {
|
handleDeletedMessages(serverRoomId,results.deletions)
|
||||||
val senderPublicKey = message.sender!!
|
handleNewMessages(serverRoomId, results.messages, isBackgroundPoll)
|
||||||
// Main message
|
|
||||||
val dataMessageProto = message.toProto()
|
|
||||||
// Content
|
|
||||||
val content = SignalServiceProtos.Content.newBuilder()
|
|
||||||
content.dataMessage = dataMessageProto
|
|
||||||
// Envelope
|
|
||||||
val builder = SignalServiceProtos.Envelope.newBuilder()
|
|
||||||
builder.type = SignalServiceProtos.Envelope.Type.SESSION_MESSAGE
|
|
||||||
builder.source = senderPublicKey
|
|
||||||
builder.sourceDevice = 1
|
|
||||||
builder.content = content.build().toByteString()
|
|
||||||
builder.timestamp = message.sentTimestamp
|
|
||||||
val envelope = builder.build()
|
|
||||||
val job = MessageReceiveJob(envelope.toByteArray(), isBackgroundPoll, message.serverID, openGroup.id)
|
|
||||||
Log.d("Loki", "Scheduling Job $job")
|
|
||||||
if (isBackgroundPoll) {
|
|
||||||
job.executeAsync().always { deferred.resolve(Unit) }
|
|
||||||
// The promise is just used to keep track of when we're done
|
|
||||||
} else {
|
|
||||||
JobQueue.shared.add(job)
|
|
||||||
}
|
|
||||||
} catch (e: Exception) {
|
|
||||||
Log.e("Loki", "Exception parsing message", e)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
isCaughtUp = true
|
}.always {
|
||||||
isPollOngoing = false
|
|
||||||
deferred.resolve(Unit)
|
|
||||||
}.fail {
|
|
||||||
Log.e("Loki", "Failed to get messages for group chat with room: ${openGroup.room} on server: ${openGroup.server}.", it)
|
|
||||||
isPollOngoing = false
|
isPollOngoing = false
|
||||||
|
if (!isBackgroundPoll) {
|
||||||
|
val delay = calculatePollInterval()
|
||||||
|
Log.d("Loki", "polling in ${delay}ms")
|
||||||
|
executorService?.schedule(this@OpenGroupV2Poller::compactPoll, delay, TimeUnit.MILLISECONDS)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return deferred.promise
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun pollForDeletedMessages() {
|
private fun handleNewMessages(serverRoomId: String, newMessages: List<OpenGroupMessageV2>, isBackgroundPoll: Boolean) {
|
||||||
|
newMessages.forEach { message ->
|
||||||
|
try {
|
||||||
|
val senderPublicKey = message.sender!!
|
||||||
|
// Main message
|
||||||
|
val dataMessageProto = message.toProto()
|
||||||
|
// Content
|
||||||
|
val content = SignalServiceProtos.Content.newBuilder()
|
||||||
|
content.dataMessage = dataMessageProto
|
||||||
|
// Envelope
|
||||||
|
val builder = SignalServiceProtos.Envelope.newBuilder()
|
||||||
|
builder.type = SignalServiceProtos.Envelope.Type.SESSION_MESSAGE
|
||||||
|
builder.source = senderPublicKey
|
||||||
|
builder.sourceDevice = 1
|
||||||
|
builder.content = content.build().toByteString()
|
||||||
|
builder.timestamp = message.sentTimestamp
|
||||||
|
val envelope = builder.build()
|
||||||
|
val job = MessageReceiveJob(envelope.toByteArray(), isBackgroundPoll, message.serverID, serverRoomId)
|
||||||
|
Log.d("Loki", "Scheduling Job $job")
|
||||||
|
if (isBackgroundPoll) {
|
||||||
|
job.executeAsync()
|
||||||
|
// The promise is just used to keep track of when we're done
|
||||||
|
} else {
|
||||||
|
JobQueue.shared.add(job)
|
||||||
|
}
|
||||||
|
receivedQueue.addFirst(message.sentTimestamp)
|
||||||
|
} catch (e: Exception) {
|
||||||
|
Log.e("Loki", "Exception parsing message", e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun handleDeletedMessages(serverRoomId: String, deletedMessageServerIDs: List<Long>) {
|
||||||
val messagingModule = MessagingModuleConfiguration.shared
|
val messagingModule = MessagingModuleConfiguration.shared
|
||||||
val address = GroupUtil.getEncodedOpenGroupID(openGroup.id.toByteArray())
|
val address = GroupUtil.getEncodedOpenGroupID(serverRoomId.toByteArray())
|
||||||
val threadId = messagingModule.storage.getThreadIdFor(Address.fromSerialized(address)) ?: return
|
val threadId = messagingModule.storage.getThreadIdFor(Address.fromSerialized(address)) ?: return
|
||||||
|
|
||||||
OpenGroupAPIV2.getDeletedMessages(openGroup.room, openGroup.server).success { deletedMessageServerIDs ->
|
val deletedMessageIDs = deletedMessageServerIDs.mapNotNull { serverId ->
|
||||||
|
messagingModule.messageDataProvider.getMessageID(serverId, threadId)
|
||||||
val deletedMessageIDs = deletedMessageServerIDs.mapNotNull { serverId ->
|
}
|
||||||
messagingModule.messageDataProvider.getMessageID(serverId.deletedMessageId, threadId)
|
deletedMessageIDs.forEach { (messageId, isSms) ->
|
||||||
}
|
MessagingModuleConfiguration.shared.messageDataProvider.deleteMessage(messageId, isSms)
|
||||||
deletedMessageIDs.forEach { (messageId, isSms) ->
|
|
||||||
MessagingModuleConfiguration.shared.messageDataProvider.deleteMessage(messageId, isSms)
|
|
||||||
}
|
|
||||||
}.fail {
|
|
||||||
Log.d("Loki", "Failed to get deleted messages for group chat with ID: ${openGroup.room} on server: ${openGroup.server}.")
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
private fun pollForModerators() {
|
|
||||||
OpenGroupAPIV2.getModerators(openGroup.room, openGroup.server)
|
|
||||||
}
|
}
|
||||||
// endregion
|
// endregion
|
||||||
}
|
}
|
Loading…
x
Reference in New Issue
Block a user