diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/BatchMessageReceiveJob.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/BatchMessageReceiveJob.kt index f1df08d1fa..f8a1b03ea5 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/BatchMessageReceiveJob.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/BatchMessageReceiveJob.kt @@ -31,6 +31,8 @@ class BatchMessageReceiveJob( const val TAG = "BatchMessageReceiveJob" const val KEY = "BatchMessageReceiveJob" + const val BATCH_DEFAULT_NUMBER = 50 + // Keys used for database storage private val NUM_MESSAGES_KEY = "numMessages" private val DATA_KEY = "data" diff --git a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/ClosedGroupPollerV2.kt b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/ClosedGroupPollerV2.kt index 56ec3ffafc..4ce658db8e 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/ClosedGroupPollerV2.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/ClosedGroupPollerV2.kt @@ -4,8 +4,10 @@ import nl.komponents.kovenant.Promise import nl.komponents.kovenant.functional.bind import nl.komponents.kovenant.functional.map import org.session.libsession.messaging.MessagingModuleConfiguration +import org.session.libsession.messaging.jobs.BatchMessageReceiveJob import org.session.libsession.messaging.jobs.JobQueue import org.session.libsession.messaging.jobs.MessageReceiveJob +import org.session.libsession.messaging.jobs.MessageReceiveParameters import org.session.libsession.snode.SnodeAPI import org.session.libsession.utilities.GroupUtil import org.session.libsignal.crypto.getRandomElementOrNull @@ -100,8 +102,12 @@ class ClosedGroupPollerV2 { } promise.success { envelopes -> if (!isPolling(groupPublicKey)) { return@success } - envelopes.forEach { (envelope, serverHash) -> - val job = MessageReceiveJob(envelope.toByteArray(), serverHash) + + val parameters = envelopes.map { (envelope, serverHash) -> + MessageReceiveParameters(envelope.toByteArray(), serverHash = serverHash) + } + parameters.chunked(BatchMessageReceiveJob.BATCH_DEFAULT_NUMBER).forEach { chunk -> + val job = BatchMessageReceiveJob(chunk) JobQueue.shared.add(job) } } diff --git a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPollerV2.kt b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPollerV2.kt index c604fb7f9b..844c796dfb 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPollerV2.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPollerV2.kt @@ -73,7 +73,7 @@ class OpenGroupPollerV2(private val server: String, private val executorService: builder.build() to message.serverID } - envelopes.chunked(20).forEach { list -> + envelopes.chunked(BatchMessageReceiveJob.BATCH_DEFAULT_NUMBER).forEach { list -> val parameters = list.map { (message, serverId) -> MessageReceiveParameters(message.toByteArray(), openGroupMessageServerID = serverId) } diff --git a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/Poller.kt b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/Poller.kt index 0edcf0a509..ffd3bcff47 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/Poller.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/Poller.kt @@ -95,7 +95,7 @@ class Poller { val parameters = messages.map { (envelope, serverHash) -> MessageReceiveParameters(envelope.toByteArray(), serverHash = serverHash) } - parameters.chunked(20).forEach { chunk -> + parameters.chunked(BatchMessageReceiveJob.BATCH_DEFAULT_NUMBER).forEach { chunk -> val job = BatchMessageReceiveJob(chunk) JobQueue.shared.add(job) }