From f56a16b31d9fd7d98c886668fa6d58b9389f99c9 Mon Sep 17 00:00:00 2001 From: Harris Date: Wed, 6 Oct 2021 13:56:13 +1100 Subject: [PATCH] refactor: remove pubkey logs from poller log spam and move attachment download job to rx queue --- .../java/org/session/libsession/messaging/jobs/JobQueue.kt | 7 +------ .../sending_receiving/pollers/ClosedGroupPollerV2.kt | 4 +--- 2 files changed, 2 insertions(+), 9 deletions(-) diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt index 0c200c588e..88510fb278 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt @@ -19,7 +19,6 @@ class JobQueue : JobDelegate { private val jobTimestampMap = ConcurrentHashMap() private val rxDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher() private val txDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher() - private val attachmentDispatcher = Executors.newFixedThreadPool(4).asCoroutineDispatcher() private val scope = GlobalScope + SupervisorJob() private val queue = Channel(UNLIMITED) private val pendingJobIds = mutableSetOf() @@ -39,18 +38,15 @@ class JobQueue : JobDelegate { scope.launch { val rxQueue = Channel(capacity = 4096) val txQueue = Channel(capacity = 4096) - val attachmentQueue = Channel(capacity = 4096) val receiveJob = processWithDispatcher(rxQueue, rxDispatcher) val txJob = processWithDispatcher(txQueue, txDispatcher) - val attachmentJob = processWithDispatcher(attachmentQueue, attachmentDispatcher) while (isActive) { for (job in queue) { when (job) { is NotifyPNServerJob, is AttachmentUploadJob, is MessageSendJob -> txQueue.send(job) - is AttachmentDownloadJob -> attachmentQueue.send(job) - is MessageReceiveJob, is BatchMessageReceiveJob, is TrimThreadJob -> rxQueue.send(job) + is MessageReceiveJob, is TrimThreadJob, is BatchMessageReceiveJob, is AttachmentDownloadJob-> rxQueue.send(job) else -> throw IllegalStateException("Unexpected job type.") } } @@ -59,7 +55,6 @@ class JobQueue : JobDelegate { // The job has been cancelled receiveJob.cancel() txJob.cancel() - attachmentJob.cancel() } } diff --git a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/ClosedGroupPollerV2.kt b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/ClosedGroupPollerV2.kt index f165c97540..56ec3ffafc 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/ClosedGroupPollerV2.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/ClosedGroupPollerV2.kt @@ -10,7 +10,6 @@ import org.session.libsession.snode.SnodeAPI import org.session.libsession.utilities.GroupUtil import org.session.libsignal.crypto.getRandomElementOrNull import org.session.libsignal.utilities.Log -import org.session.libsignal.utilities.successBackground import java.util.* import java.util.concurrent.Executors import java.util.concurrent.ScheduledFuture @@ -82,7 +81,6 @@ class ClosedGroupPollerV2 { val limit: Long = 12 * 60 * 60 * 1000 val a = (Companion.maxPollInterval - minPollInterval).toDouble() / limit.toDouble() val nextPollInterval = a * min(timeSinceLastMessage, limit) + minPollInterval - Log.d("Loki", "Next poll interval for closed group with public key: $groupPublicKey is ${nextPollInterval / 1000} s.") executorService?.schedule({ poll(groupPublicKey).success { pollRecursively(groupPublicKey) @@ -108,7 +106,7 @@ class ClosedGroupPollerV2 { } } promise.fail { - Log.d("Loki", "Polling failed for closed group with public key: $groupPublicKey due to error: $it.") + Log.d("Loki", "Polling failed for closed group due to error: $it.") } return promise.map { } }