mirror of
https://github.com/oxen-io/session-android.git
synced 2024-12-24 16:57:50 +00:00
Merge pull request #568 from hjubb/improve_background_jobs
More Efficient Thread Trimming & Database Cleanup
This commit is contained in:
commit
ed87961850
@ -46,7 +46,6 @@ import org.thoughtcrime.securesms.database.model.MediaMmsMessageRecord;
|
|||||||
import org.thoughtcrime.securesms.database.model.MessageRecord;
|
import org.thoughtcrime.securesms.database.model.MessageRecord;
|
||||||
import org.thoughtcrime.securesms.database.model.NotificationMmsMessageRecord;
|
import org.thoughtcrime.securesms.database.model.NotificationMmsMessageRecord;
|
||||||
import org.thoughtcrime.securesms.database.model.Quote;
|
import org.thoughtcrime.securesms.database.model.Quote;
|
||||||
import org.thoughtcrime.securesms.jobs.TrimThreadJob;
|
|
||||||
import org.session.libsession.messaging.messages.signal.IncomingMediaMessage;
|
import org.session.libsession.messaging.messages.signal.IncomingMediaMessage;
|
||||||
import org.thoughtcrime.securesms.mms.MmsException;
|
import org.thoughtcrime.securesms.mms.MmsException;
|
||||||
import org.session.libsession.messaging.messages.signal.OutgoingExpirationUpdateMessage;
|
import org.session.libsession.messaging.messages.signal.OutgoingExpirationUpdateMessage;
|
||||||
@ -668,7 +667,6 @@ public class MmsDatabase extends MessagingDatabase {
|
|||||||
}
|
}
|
||||||
|
|
||||||
notifyConversationListeners(threadId);
|
notifyConversationListeners(threadId);
|
||||||
ApplicationContext.getInstance(context).getJobManager().add(new TrimThreadJob(threadId));
|
|
||||||
|
|
||||||
return Optional.of(new InsertResult(messageId, threadId));
|
return Optional.of(new InsertResult(messageId, threadId));
|
||||||
}
|
}
|
||||||
@ -812,7 +810,6 @@ public class MmsDatabase extends MessagingDatabase {
|
|||||||
|
|
||||||
DatabaseFactory.getThreadDatabase(context).setLastSeen(threadId);
|
DatabaseFactory.getThreadDatabase(context).setLastSeen(threadId);
|
||||||
DatabaseFactory.getThreadDatabase(context).setHasSent(threadId, true);
|
DatabaseFactory.getThreadDatabase(context).setHasSent(threadId, true);
|
||||||
ApplicationContext.getInstance(context).getJobManager().add(new TrimThreadJob(threadId));
|
|
||||||
|
|
||||||
return messageId;
|
return messageId;
|
||||||
}
|
}
|
||||||
|
@ -35,7 +35,6 @@ import org.session.libsession.utilities.IdentityKeyMismatchList;
|
|||||||
import org.thoughtcrime.securesms.database.helpers.SQLCipherOpenHelper;
|
import org.thoughtcrime.securesms.database.helpers.SQLCipherOpenHelper;
|
||||||
import org.thoughtcrime.securesms.database.model.MessageRecord;
|
import org.thoughtcrime.securesms.database.model.MessageRecord;
|
||||||
import org.thoughtcrime.securesms.database.model.SmsMessageRecord;
|
import org.thoughtcrime.securesms.database.model.SmsMessageRecord;
|
||||||
import org.thoughtcrime.securesms.jobs.TrimThreadJob;
|
|
||||||
import org.session.libsession.messaging.messages.signal.IncomingGroupMessage;
|
import org.session.libsession.messaging.messages.signal.IncomingGroupMessage;
|
||||||
import org.session.libsession.messaging.messages.signal.IncomingTextMessage;
|
import org.session.libsession.messaging.messages.signal.IncomingTextMessage;
|
||||||
import org.session.libsession.messaging.messages.signal.OutgoingTextMessage;
|
import org.session.libsession.messaging.messages.signal.OutgoingTextMessage;
|
||||||
@ -414,7 +413,6 @@ public class SmsDatabase extends MessagingDatabase {
|
|||||||
|
|
||||||
notifyConversationListeners(threadId);
|
notifyConversationListeners(threadId);
|
||||||
|
|
||||||
ApplicationContext.getInstance(context).getJobManager().add(new TrimThreadJob(threadId));
|
|
||||||
|
|
||||||
return Optional.of(new InsertResult(messageId, threadId));
|
return Optional.of(new InsertResult(messageId, threadId));
|
||||||
}
|
}
|
||||||
@ -484,7 +482,6 @@ public class SmsDatabase extends MessagingDatabase {
|
|||||||
|
|
||||||
notifyConversationListeners(threadId);
|
notifyConversationListeners(threadId);
|
||||||
|
|
||||||
ApplicationContext.getInstance(context).getJobManager().add(new TrimThreadJob(threadId));
|
|
||||||
|
|
||||||
return messageId;
|
return messageId;
|
||||||
}
|
}
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
package org.thoughtcrime.securesms.database
|
package org.thoughtcrime.securesms.database
|
||||||
|
|
||||||
|
import android.app.job.JobScheduler
|
||||||
import android.content.Context
|
import android.content.Context
|
||||||
import android.net.Uri
|
import android.net.Uri
|
||||||
import org.session.libsession.database.StorageProtocol
|
import org.session.libsession.database.StorageProtocol
|
||||||
@ -151,6 +152,11 @@ class Storage(context: Context, helper: SQLCipherOpenHelper) : Database(context,
|
|||||||
messageID = result.messageId
|
messageID = result.messageId
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
val threadID = message.threadID
|
||||||
|
// open group trim thread job is scheduled after processing in OpenGroupPollerV2
|
||||||
|
if (openGroupID.isNullOrEmpty() && threadID != null && threadID >= 0) {
|
||||||
|
JobQueue.shared.add(TrimThreadJob(threadID))
|
||||||
|
}
|
||||||
return messageID
|
return messageID
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -529,6 +535,11 @@ class Storage(context: Context, helper: SQLCipherOpenHelper) : Database(context,
|
|||||||
return threadDB.getLastUpdated(threadID)
|
return threadDB.getLastUpdated(threadID)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
override fun trimThread(threadID: Long, threadLimit: Int) {
|
||||||
|
val threadDB = DatabaseFactory.getThreadDatabase(context)
|
||||||
|
threadDB.trimThread(threadID, threadLimit)
|
||||||
|
}
|
||||||
|
|
||||||
override fun getAttachmentDataUri(attachmentId: AttachmentId): Uri {
|
override fun getAttachmentDataUri(attachmentId: AttachmentId): Uri {
|
||||||
return PartAuthority.getAttachmentDataUri(attachmentId)
|
return PartAuthority.getAttachmentDataUri(attachmentId)
|
||||||
}
|
}
|
||||||
|
@ -423,6 +423,7 @@ public class ThreadDatabase extends Database {
|
|||||||
DatabaseFactory.getSmsDatabase(context).deleteThread(threadId);
|
DatabaseFactory.getSmsDatabase(context).deleteThread(threadId);
|
||||||
DatabaseFactory.getMmsDatabase(context).deleteThread(threadId);
|
DatabaseFactory.getMmsDatabase(context).deleteThread(threadId);
|
||||||
DatabaseFactory.getDraftDatabase(context).clearDrafts(threadId);
|
DatabaseFactory.getDraftDatabase(context).clearDrafts(threadId);
|
||||||
|
DatabaseFactory.getLokiMessageDatabase(context).deleteThread(threadId);
|
||||||
deleteThread(threadId);
|
deleteThread(threadId);
|
||||||
notifyConversationListeners(threadId);
|
notifyConversationListeners(threadId);
|
||||||
notifyConversationListListeners();
|
notifyConversationListListeners();
|
||||||
|
@ -325,9 +325,6 @@ class HomeActivity : PassphraseRequiredActionBarActivity(), ConversationClickLis
|
|||||||
// Delete the conversation
|
// Delete the conversation
|
||||||
val v2OpenGroup = DatabaseFactory.getLokiThreadDatabase(context).getOpenGroupChat(threadID)
|
val v2OpenGroup = DatabaseFactory.getLokiThreadDatabase(context).getOpenGroupChat(threadID)
|
||||||
if (v2OpenGroup != null) {
|
if (v2OpenGroup != null) {
|
||||||
val apiDB = DatabaseFactory.getLokiAPIDatabase(context)
|
|
||||||
apiDB.removeLastMessageServerID(v2OpenGroup.room, v2OpenGroup.server)
|
|
||||||
apiDB.removeLastDeletionServerID(v2OpenGroup.room, v2OpenGroup.server)
|
|
||||||
OpenGroupManager.delete(v2OpenGroup.server, v2OpenGroup.room, this@HomeActivity)
|
OpenGroupManager.delete(v2OpenGroup.server, v2OpenGroup.room, this@HomeActivity)
|
||||||
} else {
|
} else {
|
||||||
ThreadUtils.queue {
|
ThreadUtils.queue {
|
||||||
|
@ -89,7 +89,7 @@ object OpenGroupManager {
|
|||||||
val openGroup = OpenGroupV2(server, room, info.name, publicKey)
|
val openGroup = OpenGroupV2(server, room, info.name, publicKey)
|
||||||
threadDB.setOpenGroupChat(openGroup, threadID)
|
threadDB.setOpenGroupChat(openGroup, threadID)
|
||||||
// Start the poller if needed
|
// Start the poller if needed
|
||||||
if (pollers[server] == null) {
|
pollers[server]?.startIfNeeded() ?: run {
|
||||||
val poller = OpenGroupPollerV2(server, executorService)
|
val poller = OpenGroupPollerV2(server, executorService)
|
||||||
Util.runOnMain { poller.startIfNeeded() }
|
Util.runOnMain { poller.startIfNeeded() }
|
||||||
pollers[server] = poller
|
pollers[server] = poller
|
||||||
@ -111,9 +111,11 @@ object OpenGroupManager {
|
|||||||
pollers.remove(server)
|
pollers.remove(server)
|
||||||
}
|
}
|
||||||
// Delete
|
// Delete
|
||||||
|
storage.removeLastDeletionServerID(room, server)
|
||||||
|
storage.removeLastMessageServerID(room, server)
|
||||||
|
val lokiThreadDB = DatabaseFactory.getLokiThreadDatabase(context)
|
||||||
|
lokiThreadDB.removeOpenGroupChat(threadID)
|
||||||
ThreadUtils.queue {
|
ThreadUtils.queue {
|
||||||
storage.removeLastDeletionServerID(room, server)
|
|
||||||
storage.removeLastMessageServerID(room, server)
|
|
||||||
GroupManager.deleteGroup(groupID, context) // Must be invoked on a background thread
|
GroupManager.deleteGroup(groupID, context) // Must be invoked on a background thread
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -8,6 +8,7 @@ import org.thoughtcrime.securesms.database.DatabaseFactory
|
|||||||
import org.thoughtcrime.securesms.database.helpers.SQLCipherOpenHelper
|
import org.thoughtcrime.securesms.database.helpers.SQLCipherOpenHelper
|
||||||
import org.thoughtcrime.securesms.loki.utilities.*
|
import org.thoughtcrime.securesms.loki.utilities.*
|
||||||
import org.session.libsignal.database.LokiMessageDatabaseProtocol
|
import org.session.libsignal.database.LokiMessageDatabaseProtocol
|
||||||
|
import org.session.libsignal.utilities.Log
|
||||||
|
|
||||||
class LokiMessageDatabase(context: Context, helper: SQLCipherOpenHelper) : Database(context, helper), LokiMessageDatabaseProtocol {
|
class LokiMessageDatabase(context: Context, helper: SQLCipherOpenHelper) : Database(context, helper), LokiMessageDatabaseProtocol {
|
||||||
|
|
||||||
@ -131,4 +132,26 @@ class LokiMessageDatabase(context: Context, helper: SQLCipherOpenHelper) : Datab
|
|||||||
contentValues.put(Companion.errorMessage, errorMessage)
|
contentValues.put(Companion.errorMessage, errorMessage)
|
||||||
database.insertOrUpdate(errorMessageTable, contentValues, "${Companion.messageID} = ?", arrayOf(messageID.toString()))
|
database.insertOrUpdate(errorMessageTable, contentValues, "${Companion.messageID} = ?", arrayOf(messageID.toString()))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fun deleteThread(threadId: Long) {
|
||||||
|
val database = databaseHelper.writableDatabase
|
||||||
|
try {
|
||||||
|
val messages = mutableSetOf<Pair<Long,Long>>()
|
||||||
|
database.get(messageThreadMappingTable, "${Companion.threadID} = ?", arrayOf(threadId.toString())) { cursor ->
|
||||||
|
// for each add
|
||||||
|
while (cursor.moveToNext()) {
|
||||||
|
messages.add(cursor.getLong(Companion.messageID) to cursor.getLong(Companion.serverID))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
var deletedCount = 0L
|
||||||
|
database.beginTransaction()
|
||||||
|
messages.forEach { (messageId, serverId) ->
|
||||||
|
deletedCount += database.delete(messageIDTable, "${Companion.messageID} = ? AND ${Companion.serverID} = ?", arrayOf(messageId.toString(), serverId.toString()))
|
||||||
|
}
|
||||||
|
val mappingDeleted = database.delete(messageThreadMappingTable, "${Companion.threadID} = ?", arrayOf(threadId.toString()))
|
||||||
|
database.setTransactionSuccessful()
|
||||||
|
} finally {
|
||||||
|
database.endTransaction()
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
@ -73,4 +73,12 @@ class LokiThreadDatabase(context: Context, helper: SQLCipherOpenHelper) : Databa
|
|||||||
contentValues.put(publicChat, JsonUtil.toJson(openGroupV2.toJson()))
|
contentValues.put(publicChat, JsonUtil.toJson(openGroupV2.toJson()))
|
||||||
database.insertOrUpdate(publicChatTable, contentValues, "${Companion.threadID} = ?", arrayOf(threadID.toString()))
|
database.insertOrUpdate(publicChatTable, contentValues, "${Companion.threadID} = ?", arrayOf(threadID.toString()))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fun removeOpenGroupChat(threadID: Long) {
|
||||||
|
if (threadID < 0) return
|
||||||
|
|
||||||
|
val database = databaseHelper.writableDatabase
|
||||||
|
database.delete(publicChatTable,"${Companion.threadID} = ?", arrayOf(threadID.toString()))
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
@ -130,6 +130,7 @@ interface StorageProtocol {
|
|||||||
fun getThreadId(recipient: Recipient): Long?
|
fun getThreadId(recipient: Recipient): Long?
|
||||||
fun getThreadIdForMms(mmsId: Long): Long
|
fun getThreadIdForMms(mmsId: Long): Long
|
||||||
fun getLastUpdated(threadID: Long): Long
|
fun getLastUpdated(threadID: Long): Long
|
||||||
|
fun trimThread(threadID: Long, threadLimit: Int)
|
||||||
|
|
||||||
// Contacts
|
// Contacts
|
||||||
fun getContactWithSessionID(sessionID: String): Contact?
|
fun getContactWithSessionID(sessionID: String): Contact?
|
||||||
|
@ -37,9 +37,9 @@ class JobQueue : JobDelegate {
|
|||||||
init {
|
init {
|
||||||
// Process jobs
|
// Process jobs
|
||||||
scope.launch {
|
scope.launch {
|
||||||
val rxQueue = Channel<Job>(capacity = 1024)
|
val rxQueue = Channel<Job>(capacity = 4096)
|
||||||
val txQueue = Channel<Job>(capacity = 1024)
|
val txQueue = Channel<Job>(capacity = 4096)
|
||||||
val attachmentQueue = Channel<Job>(capacity = 1024)
|
val attachmentQueue = Channel<Job>(capacity = 4096)
|
||||||
|
|
||||||
val receiveJob = processWithDispatcher(rxQueue, rxDispatcher)
|
val receiveJob = processWithDispatcher(rxQueue, rxDispatcher)
|
||||||
val txJob = processWithDispatcher(txQueue, txDispatcher)
|
val txJob = processWithDispatcher(txQueue, txDispatcher)
|
||||||
@ -50,7 +50,7 @@ class JobQueue : JobDelegate {
|
|||||||
when (job) {
|
when (job) {
|
||||||
is NotifyPNServerJob, is AttachmentUploadJob, is MessageSendJob -> txQueue.send(job)
|
is NotifyPNServerJob, is AttachmentUploadJob, is MessageSendJob -> txQueue.send(job)
|
||||||
is AttachmentDownloadJob -> attachmentQueue.send(job)
|
is AttachmentDownloadJob -> attachmentQueue.send(job)
|
||||||
is MessageReceiveJob -> rxQueue.send(job)
|
is MessageReceiveJob, is TrimThreadJob -> rxQueue.send(job)
|
||||||
else -> throw IllegalStateException("Unexpected job type.")
|
else -> throw IllegalStateException("Unexpected job type.")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -10,7 +10,8 @@ class SessionJobManagerFactories {
|
|||||||
AttachmentUploadJob.KEY to AttachmentUploadJob.Factory(),
|
AttachmentUploadJob.KEY to AttachmentUploadJob.Factory(),
|
||||||
MessageReceiveJob.KEY to MessageReceiveJob.Factory(),
|
MessageReceiveJob.KEY to MessageReceiveJob.Factory(),
|
||||||
MessageSendJob.KEY to MessageSendJob.Factory(),
|
MessageSendJob.KEY to MessageSendJob.Factory(),
|
||||||
NotifyPNServerJob.KEY to NotifyPNServerJob.Factory()
|
NotifyPNServerJob.KEY to NotifyPNServerJob.Factory(),
|
||||||
|
TrimThreadJob.KEY to TrimThreadJob.Factory()
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,47 @@
|
|||||||
|
package org.session.libsession.messaging.jobs
|
||||||
|
|
||||||
|
import org.session.libsession.messaging.MessagingModuleConfiguration
|
||||||
|
import org.session.libsession.messaging.utilities.Data
|
||||||
|
import org.session.libsession.utilities.TextSecurePreferences
|
||||||
|
|
||||||
|
class TrimThreadJob(val threadId: Long) : Job {
|
||||||
|
companion object {
|
||||||
|
const val KEY: String = "TrimThreadJob"
|
||||||
|
|
||||||
|
const val THREAD_ID = "thread_id"
|
||||||
|
}
|
||||||
|
|
||||||
|
override var delegate: JobDelegate? = null
|
||||||
|
override var id: String? = null
|
||||||
|
override var failureCount: Int = 0
|
||||||
|
|
||||||
|
override val maxFailureCount: Int = 1
|
||||||
|
|
||||||
|
override fun execute() {
|
||||||
|
val context = MessagingModuleConfiguration.shared.context
|
||||||
|
val trimmingEnabled = TextSecurePreferences.isThreadLengthTrimmingEnabled(context)
|
||||||
|
val threadLengthLimit = TextSecurePreferences.getThreadTrimLength(context)
|
||||||
|
|
||||||
|
if (trimmingEnabled) {
|
||||||
|
MessagingModuleConfiguration.shared.storage.trimThread(threadId, threadLengthLimit)
|
||||||
|
}
|
||||||
|
|
||||||
|
delegate?.handleJobSucceeded(this)
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun serialize(): Data {
|
||||||
|
return Data.Builder()
|
||||||
|
.putLong(THREAD_ID, threadId)
|
||||||
|
.build()
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun getFactoryKey(): String = "TrimThreadJob"
|
||||||
|
|
||||||
|
class Factory : Job.Factory<TrimThreadJob> {
|
||||||
|
|
||||||
|
override fun create(data: Data): TrimThreadJob {
|
||||||
|
return TrimThreadJob(data.getLong(THREAD_ID))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -258,9 +258,6 @@ object OpenGroupAPIV2 {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private fun parseMessages(room: String, server: String, rawMessages: List<Map<*, *>>): List<OpenGroupMessageV2> {
|
private fun parseMessages(room: String, server: String, rawMessages: List<Map<*, *>>): List<OpenGroupMessageV2> {
|
||||||
val storage = MessagingModuleConfiguration.shared.storage
|
|
||||||
val lastMessageServerID = storage.getLastMessageServerID(room, server) ?: 0
|
|
||||||
var currentLastMessageServerID = lastMessageServerID
|
|
||||||
val messages = rawMessages.mapNotNull { json ->
|
val messages = rawMessages.mapNotNull { json ->
|
||||||
json as Map<String, Any>
|
json as Map<String, Any>
|
||||||
try {
|
try {
|
||||||
@ -275,15 +272,11 @@ object OpenGroupAPIV2 {
|
|||||||
Log.d("Loki", "Ignoring message with invalid signature.")
|
Log.d("Loki", "Ignoring message with invalid signature.")
|
||||||
return@mapNotNull null
|
return@mapNotNull null
|
||||||
}
|
}
|
||||||
if (message.serverID > lastMessageServerID) {
|
|
||||||
currentLastMessageServerID = message.serverID
|
|
||||||
}
|
|
||||||
message
|
message
|
||||||
} catch (e: Exception) {
|
} catch (e: Exception) {
|
||||||
null
|
null
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
storage.setLastMessageServerID(room, server, currentLastMessageServerID)
|
|
||||||
return messages
|
return messages
|
||||||
}
|
}
|
||||||
// endregion
|
// endregion
|
||||||
@ -404,11 +397,6 @@ object OpenGroupAPIV2 {
|
|||||||
val type = TypeFactory.defaultInstance().constructCollectionType(List::class.java, MessageDeletion::class.java)
|
val type = TypeFactory.defaultInstance().constructCollectionType(List::class.java, MessageDeletion::class.java)
|
||||||
val idsAsString = JsonUtil.toJson(json["deletions"])
|
val idsAsString = JsonUtil.toJson(json["deletions"])
|
||||||
val deletedServerIDs = JsonUtil.fromJson<List<MessageDeletion>>(idsAsString, type) ?: throw Error.ParsingFailed
|
val deletedServerIDs = JsonUtil.fromJson<List<MessageDeletion>>(idsAsString, type) ?: throw Error.ParsingFailed
|
||||||
val lastDeletionServerID = storage.getLastDeletionServerID(roomID, server) ?: 0
|
|
||||||
val serverID = deletedServerIDs.maxByOrNull { it.id } ?: MessageDeletion.empty
|
|
||||||
if (serverID.id > lastDeletionServerID) {
|
|
||||||
storage.setLastDeletionServerID(roomID, server, serverID.id)
|
|
||||||
}
|
|
||||||
// Messages
|
// Messages
|
||||||
val rawMessages = json["messages"] as? List<Map<String, Any>> ?: return@mapNotNull null
|
val rawMessages = json["messages"] as? List<Map<String, Any>> ?: return@mapNotNull null
|
||||||
val messages = parseMessages(roomID, server, rawMessages)
|
val messages = parseMessages(roomID, server, rawMessages)
|
||||||
|
@ -5,6 +5,7 @@ import nl.komponents.kovenant.functional.map
|
|||||||
import org.session.libsession.messaging.MessagingModuleConfiguration
|
import org.session.libsession.messaging.MessagingModuleConfiguration
|
||||||
import org.session.libsession.messaging.jobs.JobQueue
|
import org.session.libsession.messaging.jobs.JobQueue
|
||||||
import org.session.libsession.messaging.jobs.MessageReceiveJob
|
import org.session.libsession.messaging.jobs.MessageReceiveJob
|
||||||
|
import org.session.libsession.messaging.jobs.TrimThreadJob
|
||||||
import org.session.libsession.messaging.open_groups.OpenGroupAPIV2
|
import org.session.libsession.messaging.open_groups.OpenGroupAPIV2
|
||||||
import org.session.libsession.messaging.open_groups.OpenGroupMessageV2
|
import org.session.libsession.messaging.open_groups.OpenGroupMessageV2
|
||||||
import org.session.libsession.utilities.Address
|
import org.session.libsession.utilities.Address
|
||||||
@ -15,6 +16,7 @@ import org.session.libsignal.utilities.successBackground
|
|||||||
import java.util.concurrent.ScheduledExecutorService
|
import java.util.concurrent.ScheduledExecutorService
|
||||||
import java.util.concurrent.ScheduledFuture
|
import java.util.concurrent.ScheduledFuture
|
||||||
import java.util.concurrent.TimeUnit
|
import java.util.concurrent.TimeUnit
|
||||||
|
import kotlin.math.max
|
||||||
|
|
||||||
class OpenGroupPollerV2(private val server: String, private val executorService: ScheduledExecutorService?) {
|
class OpenGroupPollerV2(private val server: String, private val executorService: ScheduledExecutorService?) {
|
||||||
var hasStarted = false
|
var hasStarted = false
|
||||||
@ -44,8 +46,8 @@ class OpenGroupPollerV2(private val server: String, private val executorService:
|
|||||||
return OpenGroupAPIV2.compactPoll(rooms, server).successBackground { responses ->
|
return OpenGroupAPIV2.compactPoll(rooms, server).successBackground { responses ->
|
||||||
responses.forEach { (room, response) ->
|
responses.forEach { (room, response) ->
|
||||||
val openGroupID = "$server.$room"
|
val openGroupID = "$server.$room"
|
||||||
handleNewMessages(openGroupID, response.messages, isBackgroundPoll)
|
handleNewMessages(room, openGroupID, response.messages, isBackgroundPoll)
|
||||||
handleDeletedMessages(openGroupID, response.deletions)
|
handleDeletedMessages(room, openGroupID, response.deletions)
|
||||||
if (secondToLastJob == null && !isCaughtUp) {
|
if (secondToLastJob == null && !isCaughtUp) {
|
||||||
isCaughtUp = true
|
isCaughtUp = true
|
||||||
}
|
}
|
||||||
@ -55,8 +57,13 @@ class OpenGroupPollerV2(private val server: String, private val executorService:
|
|||||||
}.map { }
|
}.map { }
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun handleNewMessages(openGroupID: String, messages: List<OpenGroupMessageV2>, isBackgroundPoll: Boolean) {
|
private fun handleNewMessages(room: String, openGroupID: String, messages: List<OpenGroupMessageV2>, isBackgroundPoll: Boolean) {
|
||||||
if (!hasStarted) { return }
|
val storage = MessagingModuleConfiguration.shared.storage
|
||||||
|
val groupID = GroupUtil.getEncodedOpenGroupID(openGroupID.toByteArray())
|
||||||
|
// check thread still exists
|
||||||
|
val threadId = storage.getThreadId(Address.fromSerialized(groupID)) ?: -1
|
||||||
|
val threadExists = threadId >= 0
|
||||||
|
if (!hasStarted || !threadExists) { return }
|
||||||
var latestJob: MessageReceiveJob? = null
|
var latestJob: MessageReceiveJob? = null
|
||||||
messages.sortedBy { it.serverID!! }.forEach { message ->
|
messages.sortedBy { it.serverID!! }.forEach { message ->
|
||||||
try {
|
try {
|
||||||
@ -82,9 +89,15 @@ class OpenGroupPollerV2(private val server: String, private val executorService:
|
|||||||
Log.e("Loki", "Exception parsing message", e)
|
Log.e("Loki", "Exception parsing message", e)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
val currentLastMessageServerID = storage.getLastMessageServerID(room, server) ?: 0
|
||||||
|
val actualMax = max(messages.mapNotNull { it.serverID }.maxOrNull() ?: 0, currentLastMessageServerID)
|
||||||
|
if (actualMax > 0) {
|
||||||
|
storage.setLastMessageServerID(room, server, actualMax)
|
||||||
|
}
|
||||||
|
JobQueue.shared.add(TrimThreadJob(threadId))
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun handleDeletedMessages(openGroupID: String, deletedMessageServerIDs: List<Long>) {
|
private fun handleDeletedMessages(room: String, openGroupID: String, deletedMessageServerIDs: List<Long>) {
|
||||||
val storage = MessagingModuleConfiguration.shared.storage
|
val storage = MessagingModuleConfiguration.shared.storage
|
||||||
val dataProvider = MessagingModuleConfiguration.shared.messageDataProvider
|
val dataProvider = MessagingModuleConfiguration.shared.messageDataProvider
|
||||||
val groupID = GroupUtil.getEncodedOpenGroupID(openGroupID.toByteArray())
|
val groupID = GroupUtil.getEncodedOpenGroupID(openGroupID.toByteArray())
|
||||||
@ -99,5 +112,10 @@ class OpenGroupPollerV2(private val server: String, private val executorService:
|
|||||||
deletedMessageIDs.forEach { (messageId, isSms) ->
|
deletedMessageIDs.forEach { (messageId, isSms) ->
|
||||||
MessagingModuleConfiguration.shared.messageDataProvider.deleteMessage(messageId, isSms)
|
MessagingModuleConfiguration.shared.messageDataProvider.deleteMessage(messageId, isSms)
|
||||||
}
|
}
|
||||||
|
val currentMax = storage.getLastDeletionServerID(room, server) ?: 0L
|
||||||
|
val latestMax = deletedMessageServerIDs.maxOrNull() ?: 0L
|
||||||
|
if (latestMax > currentMax && latestMax != 0L) {
|
||||||
|
storage.setLastDeletionServerID(room, server, latestMax)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
Loading…
x
Reference in New Issue
Block a user