From e9e67068cdfa6ed7bf6bfb82860cc6ff0027c2e7 Mon Sep 17 00:00:00 2001 From: SessionHero01 <180888785+SessionHero01@users.noreply.github.com> Date: Tue, 1 Oct 2024 14:35:04 +1000 Subject: [PATCH] Job queue improvement --- .../libsession/messaging/jobs/JobQueue.kt | 22 +++++------- .../messaging/jobs/MessageSendJob.kt | 36 +++++++------------ .../sending_receiving/MessageSender.kt | 2 -- 3 files changed, 21 insertions(+), 39 deletions(-) diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt index 8e20044bfc..288e6778ed 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt @@ -3,34 +3,28 @@ package org.session.libsession.messaging.jobs import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.SupervisorJob -import kotlinx.coroutines.asCoroutineDispatcher +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED import kotlinx.coroutines.isActive import kotlinx.coroutines.launch -import kotlinx.coroutines.plus import org.session.libsession.messaging.MessagingModuleConfiguration import org.session.libsignal.utilities.Log import java.util.Timer import java.util.concurrent.ConcurrentHashMap -import java.util.concurrent.Executors import java.util.concurrent.atomic.AtomicInteger import kotlin.concurrent.schedule import kotlin.math.min import kotlin.math.pow import kotlin.math.roundToLong +@OptIn(ExperimentalCoroutinesApi::class) class JobQueue : JobDelegate { private var hasResumedPendingJobs = false // Just for debugging private val jobTimestampMap = ConcurrentHashMap() - private val rxDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher() - private val rxMediaDispatcher = Executors.newFixedThreadPool(4).asCoroutineDispatcher() - private val openGroupDispatcher = Executors.newFixedThreadPool(8).asCoroutineDispatcher() - private val txDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher() - - private val scope = CoroutineScope(Dispatchers.Default) + SupervisorJob() + private val scope = GlobalScope private val queue = Channel(UNLIMITED) private val pendingJobIds = mutableSetOf() @@ -117,10 +111,10 @@ class JobQueue : JobDelegate { val mediaQueue = Channel(capacity = UNLIMITED) val openGroupQueue = Channel(capacity = UNLIMITED) - val receiveJob = processWithDispatcher(rxQueue, rxDispatcher, "rx", asynchronous = false) - val txJob = processWithDispatcher(txQueue, txDispatcher, "tx") - val mediaJob = processWithDispatcher(mediaQueue, rxMediaDispatcher, "media") - val openGroupJob = processWithOpenGroupDispatcher(openGroupQueue, openGroupDispatcher, "openGroup") + val receiveJob = processWithDispatcher(rxQueue, Dispatchers.Default.limitedParallelism(1), "rx", asynchronous = false) + val txJob = processWithDispatcher(txQueue, Dispatchers.Default.limitedParallelism(1), "tx") + val mediaJob = processWithDispatcher(mediaQueue, Dispatchers.Default.limitedParallelism(4), "media") + val openGroupJob = processWithOpenGroupDispatcher(openGroupQueue, Dispatchers.Default.limitedParallelism(8), "openGroup") while (isActive) { when (val job = queue.receive()) { diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageSendJob.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageSendJob.kt index 52d56184cc..a85b4b73d1 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageSendJob.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageSendJob.kt @@ -3,6 +3,7 @@ package org.session.libsession.messaging.jobs import com.esotericsoftware.kryo.Kryo import com.esotericsoftware.kryo.io.Input import com.esotericsoftware.kryo.io.Output +import kotlinx.coroutines.withTimeout import org.session.libsession.messaging.MessagingModuleConfiguration import org.session.libsession.messaging.jobs.Job.Companion.MAX_BUFFER_SIZE_BYTES import org.session.libsession.messaging.messages.Destination @@ -10,6 +11,7 @@ import org.session.libsession.messaging.messages.Message import org.session.libsession.messaging.messages.visible.VisibleMessage import org.session.libsession.messaging.sending_receiving.MessageSender import org.session.libsession.messaging.utilities.Data +import org.session.libsession.snode.utilities.await import org.session.libsignal.utilities.HTTP import org.session.libsignal.utilities.Log @@ -65,33 +67,21 @@ class MessageSendJob(val message: Message, val destination: Destination) : Job { } // Wait for all attachments to upload before continuing } val isSync = destination is Destination.Contact && destination.publicKey == sender - val promise = MessageSender.send(this.message, this.destination, isSync).success { - this.handleSuccess(dispatcherName) - }.fail { exception -> - var logStacktrace = true - when (exception) { - // No need for the stack trace for HTTP errors - is HTTP.HTTPRequestFailedException -> { - logStacktrace = false - - if (exception.statusCode == 429) { this.handlePermanentFailure(dispatcherName, exception) } - else { this.handleFailure(dispatcherName, exception) } - } - is MessageSender.Error -> { - if (!exception.isRetryable) { this.handlePermanentFailure(dispatcherName, exception) } - else { this.handleFailure(dispatcherName, exception) } - } - else -> this.handleFailure(dispatcherName, exception) + try { + withTimeout(20_000L) { + MessageSender.send(this@MessageSendJob.message, destination, isSync).await() } - if (logStacktrace) { Log.e(TAG, "Couldn't send message due to error", exception) } - else { Log.e(TAG, "Couldn't send message due to error: ${exception.message}") } - } - try { - promise.get() + this.handleSuccess(dispatcherName) + } catch (e: HTTP.HTTPRequestFailedException) { + if (e.statusCode == 429) { this.handlePermanentFailure(dispatcherName, e) } + else { this.handleFailure(dispatcherName, e) } + } catch (e: MessageSender.Error) { + if (!e.isRetryable) { this.handlePermanentFailure(dispatcherName, e) } + else { this.handleFailure(dispatcherName, e) } } catch (e: Exception) { - Log.d(TAG, "Promise failed to resolve successfully", e) + this.handleFailure(dispatcherName, e) } } diff --git a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/MessageSender.kt b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/MessageSender.kt index b196cc93ac..7271f1e216 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/MessageSender.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/MessageSender.kt @@ -25,7 +25,6 @@ import org.session.libsession.messaging.open_groups.OpenGroupApi.Capability import org.session.libsession.messaging.open_groups.OpenGroupMessage import org.session.libsession.messaging.utilities.MessageWrapper import org.session.libsession.messaging.utilities.SodiumUtilities -import org.session.libsession.snode.OwnedSwarmAuth import org.session.libsession.snode.RawResponsePromise import org.session.libsession.snode.SnodeAPI import org.session.libsession.snode.SnodeAPI.nowWithOffset @@ -40,7 +39,6 @@ import org.session.libsignal.protos.SignalServiceProtos import org.session.libsignal.utilities.AccountId import org.session.libsignal.utilities.Base64 import org.session.libsignal.utilities.IdPrefix -import org.session.libsignal.utilities.Log import org.session.libsignal.utilities.Namespace import org.session.libsignal.utilities.defaultRequiresAuth import org.session.libsignal.utilities.hasNamespaces