mirror of
https://github.com/oxen-io/session-android.git
synced 2025-02-21 10:38:26 +00:00
refactor: remove pubkey logs from poller log spam and move attachment download job to rx queue
This commit is contained in:
parent
678d8094a1
commit
f56a16b31d
@ -19,7 +19,6 @@ class JobQueue : JobDelegate {
|
|||||||
private val jobTimestampMap = ConcurrentHashMap<Long, AtomicInteger>()
|
private val jobTimestampMap = ConcurrentHashMap<Long, AtomicInteger>()
|
||||||
private val rxDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
|
private val rxDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
|
||||||
private val txDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
|
private val txDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
|
||||||
private val attachmentDispatcher = Executors.newFixedThreadPool(4).asCoroutineDispatcher()
|
|
||||||
private val scope = GlobalScope + SupervisorJob()
|
private val scope = GlobalScope + SupervisorJob()
|
||||||
private val queue = Channel<Job>(UNLIMITED)
|
private val queue = Channel<Job>(UNLIMITED)
|
||||||
private val pendingJobIds = mutableSetOf<String>()
|
private val pendingJobIds = mutableSetOf<String>()
|
||||||
@ -39,18 +38,15 @@ class JobQueue : JobDelegate {
|
|||||||
scope.launch {
|
scope.launch {
|
||||||
val rxQueue = Channel<Job>(capacity = 4096)
|
val rxQueue = Channel<Job>(capacity = 4096)
|
||||||
val txQueue = Channel<Job>(capacity = 4096)
|
val txQueue = Channel<Job>(capacity = 4096)
|
||||||
val attachmentQueue = Channel<Job>(capacity = 4096)
|
|
||||||
|
|
||||||
val receiveJob = processWithDispatcher(rxQueue, rxDispatcher)
|
val receiveJob = processWithDispatcher(rxQueue, rxDispatcher)
|
||||||
val txJob = processWithDispatcher(txQueue, txDispatcher)
|
val txJob = processWithDispatcher(txQueue, txDispatcher)
|
||||||
val attachmentJob = processWithDispatcher(attachmentQueue, attachmentDispatcher)
|
|
||||||
|
|
||||||
while (isActive) {
|
while (isActive) {
|
||||||
for (job in queue) {
|
for (job in queue) {
|
||||||
when (job) {
|
when (job) {
|
||||||
is NotifyPNServerJob, is AttachmentUploadJob, is MessageSendJob -> txQueue.send(job)
|
is NotifyPNServerJob, is AttachmentUploadJob, is MessageSendJob -> txQueue.send(job)
|
||||||
is AttachmentDownloadJob -> attachmentQueue.send(job)
|
is MessageReceiveJob, is TrimThreadJob, is BatchMessageReceiveJob, is AttachmentDownloadJob-> rxQueue.send(job)
|
||||||
is MessageReceiveJob, is BatchMessageReceiveJob, is TrimThreadJob -> rxQueue.send(job)
|
|
||||||
else -> throw IllegalStateException("Unexpected job type.")
|
else -> throw IllegalStateException("Unexpected job type.")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -59,7 +55,6 @@ class JobQueue : JobDelegate {
|
|||||||
// The job has been cancelled
|
// The job has been cancelled
|
||||||
receiveJob.cancel()
|
receiveJob.cancel()
|
||||||
txJob.cancel()
|
txJob.cancel()
|
||||||
attachmentJob.cancel()
|
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -10,7 +10,6 @@ import org.session.libsession.snode.SnodeAPI
|
|||||||
import org.session.libsession.utilities.GroupUtil
|
import org.session.libsession.utilities.GroupUtil
|
||||||
import org.session.libsignal.crypto.getRandomElementOrNull
|
import org.session.libsignal.crypto.getRandomElementOrNull
|
||||||
import org.session.libsignal.utilities.Log
|
import org.session.libsignal.utilities.Log
|
||||||
import org.session.libsignal.utilities.successBackground
|
|
||||||
import java.util.*
|
import java.util.*
|
||||||
import java.util.concurrent.Executors
|
import java.util.concurrent.Executors
|
||||||
import java.util.concurrent.ScheduledFuture
|
import java.util.concurrent.ScheduledFuture
|
||||||
@ -82,7 +81,6 @@ class ClosedGroupPollerV2 {
|
|||||||
val limit: Long = 12 * 60 * 60 * 1000
|
val limit: Long = 12 * 60 * 60 * 1000
|
||||||
val a = (Companion.maxPollInterval - minPollInterval).toDouble() / limit.toDouble()
|
val a = (Companion.maxPollInterval - minPollInterval).toDouble() / limit.toDouble()
|
||||||
val nextPollInterval = a * min(timeSinceLastMessage, limit) + minPollInterval
|
val nextPollInterval = a * min(timeSinceLastMessage, limit) + minPollInterval
|
||||||
Log.d("Loki", "Next poll interval for closed group with public key: $groupPublicKey is ${nextPollInterval / 1000} s.")
|
|
||||||
executorService?.schedule({
|
executorService?.schedule({
|
||||||
poll(groupPublicKey).success {
|
poll(groupPublicKey).success {
|
||||||
pollRecursively(groupPublicKey)
|
pollRecursively(groupPublicKey)
|
||||||
@ -108,7 +106,7 @@ class ClosedGroupPollerV2 {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
promise.fail {
|
promise.fail {
|
||||||
Log.d("Loki", "Polling failed for closed group with public key: $groupPublicKey due to error: $it.")
|
Log.d("Loki", "Polling failed for closed group due to error: $it.")
|
||||||
}
|
}
|
||||||
return promise.map { }
|
return promise.map { }
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user