Merge pull request #663 from hjubb/batch_open_group_messages

Improved Open Group Polling Performance
This commit is contained in:
Niels Andriesse 2021-07-15 14:33:28 +10:00 committed by GitHub
commit f6590afd44
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 10 additions and 23 deletions

View File

@ -25,12 +25,12 @@ class BackgroundPollWorker(val context: Context, params: WorkerParameters) : Wor
@JvmStatic @JvmStatic
fun schedulePeriodic(context: Context) { fun schedulePeriodic(context: Context) {
Log.v(TAG, "Scheduling periodic work.") Log.v(TAG, "Scheduling periodic work.")
val builder = PeriodicWorkRequestBuilder<BackgroundPollWorker>(5, TimeUnit.MINUTES) val builder = PeriodicWorkRequestBuilder<BackgroundPollWorker>(15, TimeUnit.MINUTES)
builder.setConstraints(Constraints.Builder().setRequiredNetworkType(NetworkType.CONNECTED).build()) builder.setConstraints(Constraints.Builder().setRequiredNetworkType(NetworkType.CONNECTED).build())
val workRequest = builder.build() val workRequest = builder.build()
WorkManager.getInstance(context).enqueueUniquePeriodicWork( WorkManager.getInstance(context).enqueueUniquePeriodicWork(
TAG, TAG,
ExistingPeriodicWorkPolicy.REPLACE, ExistingPeriodicWorkPolicy.KEEP,
workRequest workRequest
) )
} }

View File

@ -21,8 +21,6 @@ class MessageReceiveJob(val data: ByteArray, val openGroupMessageServerID: Long?
// Keys used for database storage // Keys used for database storage
private val DATA_KEY = "data" private val DATA_KEY = "data"
// FIXME: We probably shouldn't be using this job when background polling
private val IS_BACKGROUND_POLL_KEY = "is_background_poll"
private val OPEN_GROUP_MESSAGE_SERVER_ID_KEY = "openGroupMessageServerID" private val OPEN_GROUP_MESSAGE_SERVER_ID_KEY = "openGroupMessageServerID"
private val OPEN_GROUP_ID_KEY = "open_group_id" private val OPEN_GROUP_ID_KEY = "open_group_id"
} }
@ -35,7 +33,7 @@ class MessageReceiveJob(val data: ByteArray, val openGroupMessageServerID: Long?
val deferred = deferred<Unit, Exception>() val deferred = deferred<Unit, Exception>()
try { try {
val isRetry: Boolean = failureCount != 0 val isRetry: Boolean = failureCount != 0
val (message, proto) = MessageReceiver.parse(this.data, this.openGroupMessageServerID, isRetry) val (message, proto) = MessageReceiver.parse(this.data, this.openGroupMessageServerID)
synchronized(RECEIVE_LOCK) { // FIXME: Do we need this? synchronized(RECEIVE_LOCK) { // FIXME: Do we need this?
MessageReceiver.handle(message, proto, this.openGroupID) MessageReceiver.handle(message, proto, this.openGroupID)
} }

View File

@ -4,7 +4,6 @@ import org.session.libsession.messaging.MessagingModuleConfiguration
import org.session.libsession.messaging.messages.Message import org.session.libsession.messaging.messages.Message
import org.session.libsession.messaging.messages.control.* import org.session.libsession.messaging.messages.control.*
import org.session.libsession.messaging.messages.visible.VisibleMessage import org.session.libsession.messaging.messages.visible.VisibleMessage
import org.session.libsession.utilities.GroupUtil
import org.session.libsignal.crypto.PushTransportDetails import org.session.libsignal.crypto.PushTransportDetails
import org.session.libsignal.protos.SignalServiceProtos import org.session.libsignal.protos.SignalServiceProtos
@ -32,7 +31,7 @@ object MessageReceiver {
} }
} }
internal fun parse(data: ByteArray, openGroupServerID: Long?, isRetry: Boolean = false): Pair<Message, SignalServiceProtos.Content> { internal fun parse(data: ByteArray, openGroupServerID: Long?): Pair<Message, SignalServiceProtos.Content> {
val storage = MessagingModuleConfiguration.shared.storage val storage = MessagingModuleConfiguration.shared.storage
val userPublicKey = storage.getUserPublicKey() val userPublicKey = storage.getUserPublicKey()
val isOpenGroupMessage = (openGroupServerID != null) val isOpenGroupMessage = (openGroupServerID != null)

View File

@ -8,6 +8,8 @@ import org.session.libsession.messaging.jobs.MessageReceiveJob
import org.session.libsession.messaging.jobs.TrimThreadJob import org.session.libsession.messaging.jobs.TrimThreadJob
import org.session.libsession.messaging.open_groups.OpenGroupAPIV2 import org.session.libsession.messaging.open_groups.OpenGroupAPIV2
import org.session.libsession.messaging.open_groups.OpenGroupMessageV2 import org.session.libsession.messaging.open_groups.OpenGroupMessageV2
import org.session.libsession.messaging.sending_receiving.MessageReceiver
import org.session.libsession.messaging.sending_receiving.handle
import org.session.libsession.utilities.Address import org.session.libsession.utilities.Address
import org.session.libsession.utilities.GroupUtil import org.session.libsession.utilities.GroupUtil
import org.session.libsignal.protos.SignalServiceProtos import org.session.libsignal.protos.SignalServiceProtos
@ -64,7 +66,6 @@ class OpenGroupPollerV2(private val server: String, private val executorService:
val threadId = storage.getThreadId(Address.fromSerialized(groupID)) ?: -1 val threadId = storage.getThreadId(Address.fromSerialized(groupID)) ?: -1
val threadExists = threadId >= 0 val threadExists = threadId >= 0
if (!hasStarted || !threadExists) { return } if (!hasStarted || !threadExists) { return }
var latestJob: MessageReceiveJob? = null
messages.sortedBy { it.serverID!! }.forEach { message -> messages.sortedBy { it.serverID!! }.forEach { message ->
try { try {
val senderPublicKey = message.sender!! val senderPublicKey = message.sender!!
@ -75,20 +76,13 @@ class OpenGroupPollerV2(private val server: String, private val executorService:
builder.content = message.toProto().toByteString() builder.content = message.toProto().toByteString()
builder.timestamp = message.sentTimestamp builder.timestamp = message.sentTimestamp
val envelope = builder.build() val envelope = builder.build()
val job = MessageReceiveJob(envelope.toByteArray(), message.serverID, openGroupID) val (parsedMessage, content) = MessageReceiver.parse(envelope.toByteArray(), message.serverID)
if (isBackgroundPoll) { MessageReceiver.handle(parsedMessage, content, openGroupID)
job.executeAsync()
} else {
JobQueue.shared.add(job)
if (!isCaughtUp) {
secondToLastJob = latestJob
}
latestJob = job
}
} catch (e: Exception) { } catch (e: Exception) {
Log.e("Loki", "Exception parsing message", e) Log.e("Loki", "Exception parsing message", e)
} }
} }
val currentLastMessageServerID = storage.getLastMessageServerID(room, server) ?: 0 val currentLastMessageServerID = storage.getLastMessageServerID(room, server) ?: 0
val actualMax = max(messages.mapNotNull { it.serverID }.maxOrNull() ?: 0, currentLastMessageServerID) val actualMax = max(messages.mapNotNull { it.serverID }.maxOrNull() ?: 0, currentLastMessageServerID)
if (actualMax > 0) { if (actualMax > 0) {
@ -105,11 +99,7 @@ class OpenGroupPollerV2(private val server: String, private val executorService:
val groupID = GroupUtil.getEncodedOpenGroupID(openGroupID.toByteArray()) val groupID = GroupUtil.getEncodedOpenGroupID(openGroupID.toByteArray())
val threadID = storage.getThreadId(Address.fromSerialized(groupID)) ?: return val threadID = storage.getThreadId(Address.fromSerialized(groupID)) ?: return
val deletedMessageIDs = deletions.mapNotNull { deletion -> val deletedMessageIDs = deletions.mapNotNull { deletion ->
val messageID = dataProvider.getMessageID(deletion.deletedMessageServerID, threadID) dataProvider.getMessageID(deletion.deletedMessageServerID, threadID)
if (messageID == null) {
Log.d("Loki", "Couldn't find message ID for message with serverID: ${deletion.deletedMessageServerID}.")
}
messageID
} }
deletedMessageIDs.forEach { (messageId, isSms) -> deletedMessageIDs.forEach { (messageId, isSms) ->
MessagingModuleConfiguration.shared.messageDataProvider.deleteMessage(messageId, isSms) MessagingModuleConfiguration.shared.messageDataProvider.deleteMessage(messageId, isSms)