Job queue improvement

This commit is contained in:
SessionHero01 2024-10-01 14:35:04 +10:00
parent b9f5f940a1
commit e9e67068cd
No known key found for this signature in database
3 changed files with 21 additions and 39 deletions

View File

@ -3,34 +3,28 @@ package org.session.libsession.messaging.jobs
import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.asCoroutineDispatcher import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
import kotlinx.coroutines.isActive import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import kotlinx.coroutines.plus
import org.session.libsession.messaging.MessagingModuleConfiguration import org.session.libsession.messaging.MessagingModuleConfiguration
import org.session.libsignal.utilities.Log import org.session.libsignal.utilities.Log
import java.util.Timer import java.util.Timer
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.Executors
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import kotlin.concurrent.schedule import kotlin.concurrent.schedule
import kotlin.math.min import kotlin.math.min
import kotlin.math.pow import kotlin.math.pow
import kotlin.math.roundToLong import kotlin.math.roundToLong
@OptIn(ExperimentalCoroutinesApi::class)
class JobQueue : JobDelegate { class JobQueue : JobDelegate {
private var hasResumedPendingJobs = false // Just for debugging private var hasResumedPendingJobs = false // Just for debugging
private val jobTimestampMap = ConcurrentHashMap<Long, AtomicInteger>() private val jobTimestampMap = ConcurrentHashMap<Long, AtomicInteger>()
private val rxDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher() private val scope = GlobalScope
private val rxMediaDispatcher = Executors.newFixedThreadPool(4).asCoroutineDispatcher()
private val openGroupDispatcher = Executors.newFixedThreadPool(8).asCoroutineDispatcher()
private val txDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
private val scope = CoroutineScope(Dispatchers.Default) + SupervisorJob()
private val queue = Channel<Job>(UNLIMITED) private val queue = Channel<Job>(UNLIMITED)
private val pendingJobIds = mutableSetOf<String>() private val pendingJobIds = mutableSetOf<String>()
@ -117,10 +111,10 @@ class JobQueue : JobDelegate {
val mediaQueue = Channel<Job>(capacity = UNLIMITED) val mediaQueue = Channel<Job>(capacity = UNLIMITED)
val openGroupQueue = Channel<Job>(capacity = UNLIMITED) val openGroupQueue = Channel<Job>(capacity = UNLIMITED)
val receiveJob = processWithDispatcher(rxQueue, rxDispatcher, "rx", asynchronous = false) val receiveJob = processWithDispatcher(rxQueue, Dispatchers.Default.limitedParallelism(1), "rx", asynchronous = false)
val txJob = processWithDispatcher(txQueue, txDispatcher, "tx") val txJob = processWithDispatcher(txQueue, Dispatchers.Default.limitedParallelism(1), "tx")
val mediaJob = processWithDispatcher(mediaQueue, rxMediaDispatcher, "media") val mediaJob = processWithDispatcher(mediaQueue, Dispatchers.Default.limitedParallelism(4), "media")
val openGroupJob = processWithOpenGroupDispatcher(openGroupQueue, openGroupDispatcher, "openGroup") val openGroupJob = processWithOpenGroupDispatcher(openGroupQueue, Dispatchers.Default.limitedParallelism(8), "openGroup")
while (isActive) { while (isActive) {
when (val job = queue.receive()) { when (val job = queue.receive()) {

View File

@ -3,6 +3,7 @@ package org.session.libsession.messaging.jobs
import com.esotericsoftware.kryo.Kryo import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.io.Input import com.esotericsoftware.kryo.io.Input
import com.esotericsoftware.kryo.io.Output import com.esotericsoftware.kryo.io.Output
import kotlinx.coroutines.withTimeout
import org.session.libsession.messaging.MessagingModuleConfiguration import org.session.libsession.messaging.MessagingModuleConfiguration
import org.session.libsession.messaging.jobs.Job.Companion.MAX_BUFFER_SIZE_BYTES import org.session.libsession.messaging.jobs.Job.Companion.MAX_BUFFER_SIZE_BYTES
import org.session.libsession.messaging.messages.Destination import org.session.libsession.messaging.messages.Destination
@ -10,6 +11,7 @@ import org.session.libsession.messaging.messages.Message
import org.session.libsession.messaging.messages.visible.VisibleMessage import org.session.libsession.messaging.messages.visible.VisibleMessage
import org.session.libsession.messaging.sending_receiving.MessageSender import org.session.libsession.messaging.sending_receiving.MessageSender
import org.session.libsession.messaging.utilities.Data import org.session.libsession.messaging.utilities.Data
import org.session.libsession.snode.utilities.await
import org.session.libsignal.utilities.HTTP import org.session.libsignal.utilities.HTTP
import org.session.libsignal.utilities.Log import org.session.libsignal.utilities.Log
@ -65,33 +67,21 @@ class MessageSendJob(val message: Message, val destination: Destination) : Job {
} // Wait for all attachments to upload before continuing } // Wait for all attachments to upload before continuing
} }
val isSync = destination is Destination.Contact && destination.publicKey == sender val isSync = destination is Destination.Contact && destination.publicKey == sender
val promise = MessageSender.send(this.message, this.destination, isSync).success {
this.handleSuccess(dispatcherName)
}.fail { exception ->
var logStacktrace = true
when (exception) {
// No need for the stack trace for HTTP errors
is HTTP.HTTPRequestFailedException -> {
logStacktrace = false
if (exception.statusCode == 429) { this.handlePermanentFailure(dispatcherName, exception) }
else { this.handleFailure(dispatcherName, exception) }
}
is MessageSender.Error -> {
if (!exception.isRetryable) { this.handlePermanentFailure(dispatcherName, exception) }
else { this.handleFailure(dispatcherName, exception) }
}
else -> this.handleFailure(dispatcherName, exception)
}
if (logStacktrace) { Log.e(TAG, "Couldn't send message due to error", exception) }
else { Log.e(TAG, "Couldn't send message due to error: ${exception.message}") }
}
try { try {
promise.get() withTimeout(20_000L) {
MessageSender.send(this@MessageSendJob.message, destination, isSync).await()
}
this.handleSuccess(dispatcherName)
} catch (e: HTTP.HTTPRequestFailedException) {
if (e.statusCode == 429) { this.handlePermanentFailure(dispatcherName, e) }
else { this.handleFailure(dispatcherName, e) }
} catch (e: MessageSender.Error) {
if (!e.isRetryable) { this.handlePermanentFailure(dispatcherName, e) }
else { this.handleFailure(dispatcherName, e) }
} catch (e: Exception) { } catch (e: Exception) {
Log.d(TAG, "Promise failed to resolve successfully", e) this.handleFailure(dispatcherName, e)
} }
} }

View File

@ -25,7 +25,6 @@ import org.session.libsession.messaging.open_groups.OpenGroupApi.Capability
import org.session.libsession.messaging.open_groups.OpenGroupMessage import org.session.libsession.messaging.open_groups.OpenGroupMessage
import org.session.libsession.messaging.utilities.MessageWrapper import org.session.libsession.messaging.utilities.MessageWrapper
import org.session.libsession.messaging.utilities.SodiumUtilities import org.session.libsession.messaging.utilities.SodiumUtilities
import org.session.libsession.snode.OwnedSwarmAuth
import org.session.libsession.snode.RawResponsePromise import org.session.libsession.snode.RawResponsePromise
import org.session.libsession.snode.SnodeAPI import org.session.libsession.snode.SnodeAPI
import org.session.libsession.snode.SnodeAPI.nowWithOffset import org.session.libsession.snode.SnodeAPI.nowWithOffset
@ -40,7 +39,6 @@ import org.session.libsignal.protos.SignalServiceProtos
import org.session.libsignal.utilities.AccountId import org.session.libsignal.utilities.AccountId
import org.session.libsignal.utilities.Base64 import org.session.libsignal.utilities.Base64
import org.session.libsignal.utilities.IdPrefix import org.session.libsignal.utilities.IdPrefix
import org.session.libsignal.utilities.Log
import org.session.libsignal.utilities.Namespace import org.session.libsignal.utilities.Namespace
import org.session.libsignal.utilities.defaultRequiresAuth import org.session.libsignal.utilities.defaultRequiresAuth
import org.session.libsignal.utilities.hasNamespaces import org.session.libsignal.utilities.hasNamespaces