Merge branch 'dev' of https://github.com/oxen-io/session-android into ONS-name-1

This commit is contained in:
Ryan ZHAO
2021-05-27 16:52:12 +10:00
16 changed files with 124 additions and 41 deletions

View File

@@ -130,6 +130,7 @@ interface StorageProtocol {
fun getThreadId(recipient: Recipient): Long?
fun getThreadIdForMms(mmsId: Long): Long
fun getLastUpdated(threadID: Long): Long
fun trimThread(threadID: Long, threadLimit: Int)
// Contacts
fun getContactWithSessionID(sessionID: String): Contact?

View File

@@ -37,9 +37,9 @@ class JobQueue : JobDelegate {
init {
// Process jobs
scope.launch {
val rxQueue = Channel<Job>(capacity = 1024)
val txQueue = Channel<Job>(capacity = 1024)
val attachmentQueue = Channel<Job>(capacity = 1024)
val rxQueue = Channel<Job>(capacity = 4096)
val txQueue = Channel<Job>(capacity = 4096)
val attachmentQueue = Channel<Job>(capacity = 4096)
val receiveJob = processWithDispatcher(rxQueue, rxDispatcher)
val txJob = processWithDispatcher(txQueue, txDispatcher)
@@ -50,7 +50,7 @@ class JobQueue : JobDelegate {
when (job) {
is NotifyPNServerJob, is AttachmentUploadJob, is MessageSendJob -> txQueue.send(job)
is AttachmentDownloadJob -> attachmentQueue.send(job)
is MessageReceiveJob -> rxQueue.send(job)
is MessageReceiveJob, is TrimThreadJob -> rxQueue.send(job)
else -> throw IllegalStateException("Unexpected job type.")
}
}

View File

@@ -10,7 +10,8 @@ class SessionJobManagerFactories {
AttachmentUploadJob.KEY to AttachmentUploadJob.Factory(),
MessageReceiveJob.KEY to MessageReceiveJob.Factory(),
MessageSendJob.KEY to MessageSendJob.Factory(),
NotifyPNServerJob.KEY to NotifyPNServerJob.Factory()
NotifyPNServerJob.KEY to NotifyPNServerJob.Factory(),
TrimThreadJob.KEY to TrimThreadJob.Factory()
)
}
}

View File

@@ -0,0 +1,44 @@
package org.session.libsession.messaging.jobs
import org.session.libsession.messaging.MessagingModuleConfiguration
import org.session.libsession.messaging.utilities.Data
import org.session.libsession.utilities.TextSecurePreferences
class TrimThreadJob(val threadId: Long) : Job {
override var delegate: JobDelegate? = null
override var id: String? = null
override var failureCount: Int = 0
override val maxFailureCount: Int = 1
companion object {
const val KEY: String = "TrimThreadJob"
const val THREAD_ID = "thread_id"
}
override fun execute() {
val context = MessagingModuleConfiguration.shared.context
val trimmingEnabled = TextSecurePreferences.isThreadLengthTrimmingEnabled(context)
val threadLengthLimit = TextSecurePreferences.getThreadTrimLength(context)
if (trimmingEnabled) {
MessagingModuleConfiguration.shared.storage.trimThread(threadId, threadLengthLimit)
}
delegate?.handleJobSucceeded(this)
}
override fun serialize(): Data {
return Data.Builder()
.putLong(THREAD_ID, threadId)
.build()
}
override fun getFactoryKey(): String = "TrimThreadJob"
class Factory : Job.Factory<TrimThreadJob> {
override fun create(data: Data): TrimThreadJob {
return TrimThreadJob(data.getLong(THREAD_ID))
}
}
}

View File

@@ -258,9 +258,6 @@ object OpenGroupAPIV2 {
}
private fun parseMessages(room: String, server: String, rawMessages: List<Map<*, *>>): List<OpenGroupMessageV2> {
val storage = MessagingModuleConfiguration.shared.storage
val lastMessageServerID = storage.getLastMessageServerID(room, server) ?: 0
var currentLastMessageServerID = lastMessageServerID
val messages = rawMessages.mapNotNull { json ->
json as Map<String, Any>
try {
@@ -275,15 +272,11 @@ object OpenGroupAPIV2 {
Log.d("Loki", "Ignoring message with invalid signature.")
return@mapNotNull null
}
if (message.serverID > lastMessageServerID) {
currentLastMessageServerID = message.serverID
}
message
} catch (e: Exception) {
null
}
}
storage.setLastMessageServerID(room, server, currentLastMessageServerID)
return messages
}
// endregion
@@ -404,11 +397,6 @@ object OpenGroupAPIV2 {
val type = TypeFactory.defaultInstance().constructCollectionType(List::class.java, MessageDeletion::class.java)
val idsAsString = JsonUtil.toJson(json["deletions"])
val deletedServerIDs = JsonUtil.fromJson<List<MessageDeletion>>(idsAsString, type) ?: throw Error.ParsingFailed
val lastDeletionServerID = storage.getLastDeletionServerID(roomID, server) ?: 0
val serverID = deletedServerIDs.maxByOrNull { it.id } ?: MessageDeletion.empty
if (serverID.id > lastDeletionServerID) {
storage.setLastDeletionServerID(roomID, server, serverID.id)
}
// Messages
val rawMessages = json["messages"] as? List<Map<String, Any>> ?: return@mapNotNull null
val messages = parseMessages(roomID, server, rawMessages)

View File

@@ -5,6 +5,7 @@ import nl.komponents.kovenant.functional.map
import org.session.libsession.messaging.MessagingModuleConfiguration
import org.session.libsession.messaging.jobs.JobQueue
import org.session.libsession.messaging.jobs.MessageReceiveJob
import org.session.libsession.messaging.jobs.TrimThreadJob
import org.session.libsession.messaging.open_groups.OpenGroupAPIV2
import org.session.libsession.messaging.open_groups.OpenGroupMessageV2
import org.session.libsession.utilities.Address
@@ -15,6 +16,7 @@ import org.session.libsignal.utilities.successBackground
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.ScheduledFuture
import java.util.concurrent.TimeUnit
import kotlin.math.max
class OpenGroupPollerV2(private val server: String, private val executorService: ScheduledExecutorService?) {
var hasStarted = false
@@ -44,8 +46,8 @@ class OpenGroupPollerV2(private val server: String, private val executorService:
return OpenGroupAPIV2.compactPoll(rooms, server).successBackground { responses ->
responses.forEach { (room, response) ->
val openGroupID = "$server.$room"
handleNewMessages(openGroupID, response.messages, isBackgroundPoll)
handleDeletedMessages(openGroupID, response.deletions)
handleNewMessages(room, openGroupID, response.messages, isBackgroundPoll)
handleDeletedMessages(room, openGroupID, response.deletions)
if (secondToLastJob == null && !isCaughtUp) {
isCaughtUp = true
}
@@ -55,8 +57,13 @@ class OpenGroupPollerV2(private val server: String, private val executorService:
}.map { }
}
private fun handleNewMessages(openGroupID: String, messages: List<OpenGroupMessageV2>, isBackgroundPoll: Boolean) {
if (!hasStarted) { return }
private fun handleNewMessages(room: String, openGroupID: String, messages: List<OpenGroupMessageV2>, isBackgroundPoll: Boolean) {
val storage = MessagingModuleConfiguration.shared.storage
val groupID = GroupUtil.getEncodedOpenGroupID(openGroupID.toByteArray())
// check thread still exists
val threadId = storage.getThreadId(Address.fromSerialized(groupID)) ?: -1
val threadExists = threadId >= 0
if (!hasStarted || !threadExists) { return }
var latestJob: MessageReceiveJob? = null
messages.sortedBy { it.serverID!! }.forEach { message ->
try {
@@ -82,9 +89,15 @@ class OpenGroupPollerV2(private val server: String, private val executorService:
Log.e("Loki", "Exception parsing message", e)
}
}
val currentLastMessageServerID = storage.getLastMessageServerID(room, server) ?: 0
val actualMax = max(messages.mapNotNull { it.serverID }.maxOrNull() ?: 0, currentLastMessageServerID)
if (actualMax > 0) {
storage.setLastMessageServerID(room, server, actualMax)
}
JobQueue.shared.add(TrimThreadJob(threadId))
}
private fun handleDeletedMessages(openGroupID: String, deletedMessageServerIDs: List<Long>) {
private fun handleDeletedMessages(room: String, openGroupID: String, deletedMessageServerIDs: List<Long>) {
val storage = MessagingModuleConfiguration.shared.storage
val dataProvider = MessagingModuleConfiguration.shared.messageDataProvider
val groupID = GroupUtil.getEncodedOpenGroupID(openGroupID.toByteArray())
@@ -99,5 +112,10 @@ class OpenGroupPollerV2(private val server: String, private val executorService:
deletedMessageIDs.forEach { (messageId, isSms) ->
MessagingModuleConfiguration.shared.messageDataProvider.deleteMessage(messageId, isSms)
}
val currentMax = storage.getLastDeletionServerID(room, server) ?: 0L
val latestMax = deletedMessageServerIDs.maxOrNull() ?: 0L
if (latestMax > currentMax && latestMax != 0L) {
storage.setLastDeletionServerID(room, server, latestMax)
}
}
}