From 8439d5711523305f378ba41c0abb75fbe0cd9301 Mon Sep 17 00:00:00 2001 From: jubb Date: Mon, 10 May 2021 17:07:10 +1000 Subject: [PATCH 1/4] refactor: let the periodic work run more frequently and never fail from excessive retries preventing from re-running. remove resume pending jobs from ApplicationContext onCreate and handle in home activity's onCreate instead. prevent some illegal argument exceptions from Random.kt by returning null if empty --- .../securesms/ApplicationContext.java | 2 -- .../loki/api/BackgroundPollWorker.kt | 29 ++++--------------- .../securesms/loki/api/PublicChatManager.kt | 5 +++- .../libsession/snode/utilities/Random.kt | 1 + .../session/libsignal/service/loki/Random.kt | 1 + 5 files changed, 11 insertions(+), 27 deletions(-) diff --git a/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java b/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java index cc61634f9a..64a9929630 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java +++ b/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java @@ -327,7 +327,6 @@ public class ApplicationContext extends MultiDexApplication implements Dependenc .setJobStorage(new FastJobStorage(DatabaseFactory.getJobDatabase(this))) .setDependencyInjector(this) .build()); - JobQueue.getShared().resumePendingJobs(); } private void initializeDependencyInjection() { @@ -455,7 +454,6 @@ public class ApplicationContext extends MultiDexApplication implements Dependenc poller.setUserPublicKey(userPublicKey); return; } - LokiAPIDatabase apiDB = DatabaseFactory.getLokiAPIDatabase(this); poller = new Poller(); closedGroupPoller = new ClosedGroupPoller(); } diff --git a/app/src/main/java/org/thoughtcrime/securesms/loki/api/BackgroundPollWorker.kt b/app/src/main/java/org/thoughtcrime/securesms/loki/api/BackgroundPollWorker.kt index 7b4f2c2aa6..c020b5333d 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/loki/api/BackgroundPollWorker.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/loki/api/BackgroundPollWorker.kt @@ -17,7 +17,6 @@ import org.session.libsession.snode.SnodeAPI import org.session.libsession.utilities.TextSecurePreferences import org.session.libsignal.utilities.logging.Log import org.thoughtcrime.securesms.database.DatabaseFactory -import java.io.IOException import java.util.concurrent.TimeUnit class BackgroundPollWorker(val context: Context, params: WorkerParameters) : Worker(context, params) { @@ -25,26 +24,10 @@ class BackgroundPollWorker(val context: Context, params: WorkerParameters) : Wor companion object { const val TAG = "BackgroundPollWorker" - private const val RETRY_ATTEMPTS = 3 - - @JvmStatic - fun scheduleInstant(context: Context) { - val workRequest = OneTimeWorkRequestBuilder() - .setConstraints(Constraints.Builder() - .setRequiredNetworkType(NetworkType.CONNECTED) - .build() - ) - .build() - - WorkManager - .getInstance(context) - .enqueue(workRequest) - } - @JvmStatic fun schedulePeriodic(context: Context) { Log.v(TAG, "Scheduling periodic work.") - val workRequest = PeriodicWorkRequestBuilder(15, TimeUnit.MINUTES) + val workRequest = PeriodicWorkRequestBuilder(5, TimeUnit.MINUTES) .setConstraints(Constraints.Builder() .setRequiredNetworkType(NetworkType.CONNECTED) .build() @@ -55,7 +38,7 @@ class BackgroundPollWorker(val context: Context, params: WorkerParameters) : Wor .getInstance(context) .enqueueUniquePeriodicWork( TAG, - ExistingPeriodicWorkPolicy.KEEP, + ExistingPeriodicWorkPolicy.REPLACE, workRequest ) } @@ -105,9 +88,8 @@ class BackgroundPollWorker(val context: Context, params: WorkerParameters) : Wor return Result.success() } catch (exception: Exception) { - Log.v(TAG, "Background poll failed due to error: ${exception.message}.", exception) - - return if (runAttemptCount < RETRY_ATTEMPTS) Result.retry() else Result.failure() + Log.e(TAG, "Background poll failed due to error: ${exception.message}.", exception) + return Result.retry() } } @@ -116,8 +98,7 @@ class BackgroundPollWorker(val context: Context, params: WorkerParameters) : Wor override fun onReceive(context: Context, intent: Intent) { if (intent.action == Intent.ACTION_BOOT_COMPLETED) { Log.v(TAG, "Boot broadcast caught.") - BackgroundPollWorker.scheduleInstant(context) - BackgroundPollWorker.schedulePeriodic(context) + schedulePeriodic(context) } } } diff --git a/app/src/main/java/org/thoughtcrime/securesms/loki/api/PublicChatManager.kt b/app/src/main/java/org/thoughtcrime/securesms/loki/api/PublicChatManager.kt index 04ed0e4c95..22878d46fd 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/loki/api/PublicChatManager.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/loki/api/PublicChatManager.kt @@ -31,7 +31,7 @@ class PublicChatManager(private val context: Context) { refreshChatsAndPollers() for ((threadID, _) in chats) { val poller = pollers[threadID] - areAllCaughtUp = if (poller != null) areAllCaughtUp && poller.isCaughtUp else true + areAllCaughtUp = if (poller != null) areAllCaughtUp && poller.isCaughtUp else areAllCaughtUp } return areAllCaughtUp } @@ -42,6 +42,9 @@ class PublicChatManager(private val context: Context) { val poller = pollers[threadID] ?: OpenGroupPoller(chat, executorService) poller.isCaughtUp = false } + for ((_,poller) in v2Pollers) { + poller.isCaughtUp = false + } } public fun startPollersIfNeeded() { diff --git a/libsession/src/main/java/org/session/libsession/snode/utilities/Random.kt b/libsession/src/main/java/org/session/libsession/snode/utilities/Random.kt index 2ec42cdf5b..72ceee9f3b 100644 --- a/libsession/src/main/java/org/session/libsession/snode/utilities/Random.kt +++ b/libsession/src/main/java/org/session/libsession/snode/utilities/Random.kt @@ -6,6 +6,7 @@ import java.security.SecureRandom * Uses `SecureRandom` to pick an element from this collection. */ fun Collection.getRandomElementOrNull(): T? { + if (isEmpty()) return null val index = SecureRandom().nextInt(size) // SecureRandom() should be cryptographically secure return elementAtOrNull(index) } diff --git a/libsignal/src/main/java/org/session/libsignal/service/loki/Random.kt b/libsignal/src/main/java/org/session/libsignal/service/loki/Random.kt index 68bc4380c5..b1c1cd2af7 100644 --- a/libsignal/src/main/java/org/session/libsignal/service/loki/Random.kt +++ b/libsignal/src/main/java/org/session/libsignal/service/loki/Random.kt @@ -6,6 +6,7 @@ import java.security.SecureRandom * Uses `SecureRandom` to pick an element from this collection. */ fun Collection.getRandomElementOrNull(): T? { + if (isEmpty()) return null val index = SecureRandom().nextInt(size) // SecureRandom() should be cryptographically secure return elementAtOrNull(index) } From 9f099771605cc528a333b88d72e778ca3e3c3589 Mon Sep 17 00:00:00 2001 From: jubb Date: Wed, 12 May 2021 10:43:17 +1000 Subject: [PATCH 2/4] refactor: remove registration required for job serialization and test logs, don't try to read class object if the message send class is not of expected type --- .../securesms/loki/api/SessionProtocolImpl.kt | 1 - .../libsession/messaging/jobs/AttachmentUploadJob.kt | 1 + .../java/org/session/libsession/messaging/jobs/Job.kt | 1 + .../org/session/libsession/messaging/jobs/JobQueue.kt | 1 + .../session/libsession/messaging/jobs/MessageSendJob.kt | 9 ++++++++- .../libsession/messaging/jobs/NotifyPNServerJob.kt | 1 + .../sending_receiving/ReceivedMessageHandler.kt | 1 - .../main/java/org/session/libsession/snode/SnodeAPI.kt | 2 +- 8 files changed, 13 insertions(+), 4 deletions(-) diff --git a/app/src/main/java/org/thoughtcrime/securesms/loki/api/SessionProtocolImpl.kt b/app/src/main/java/org/thoughtcrime/securesms/loki/api/SessionProtocolImpl.kt index 9be7b3e461..4916ef483d 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/loki/api/SessionProtocolImpl.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/loki/api/SessionProtocolImpl.kt @@ -23,7 +23,6 @@ class SessionProtocolImpl(private val context: Context) : SessionProtocol { override fun decrypt(ciphertext: ByteArray, x25519KeyPair: ECKeyPair): Pair { val recipientX25519PrivateKey = x25519KeyPair.privateKey.serialize() val recipientX25519PublicKey = Hex.fromStringCondensed(x25519KeyPair.hexEncodedPublicKey.removing05PrefixIfNeeded()) - Log.d("Test", "recipientX25519PublicKey: $recipientX25519PublicKey") val signatureSize = Sign.BYTES val ed25519PublicKeySize = Sign.PUBLICKEYBYTES diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/AttachmentUploadJob.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/AttachmentUploadJob.kt index 1d8b1a7170..c6e73e12e1 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/AttachmentUploadJob.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/AttachmentUploadJob.kt @@ -135,6 +135,7 @@ class AttachmentUploadJob(val attachmentID: Long, val threadID: String, val mess override fun create(data: Data): AttachmentUploadJob { val serializedMessage = data.getByteArray(KEY_MESSAGE) val kryo = Kryo() + kryo.isRegistrationRequired = false val input = Input(serializedMessage) val message: Message = kryo.readObject(input, Message::class.java) input.close() diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/Job.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/Job.kt index 4693fddf4a..aefe7cc907 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/Job.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/Job.kt @@ -11,6 +11,7 @@ interface Job { // Keys used for database storage private val KEY_ID = "id" private val KEY_FAILURE_COUNT = "failure_count" + internal const val MAX_BUFFER_SIZE = 1_000_000 // bytes } fun execute() diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt index cffb2db7d6..ba21280700 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt @@ -50,6 +50,7 @@ class JobQueue : JobDelegate { private fun Job.canExecuteParallel(): Boolean { return this.javaClass in arrayOf( + MessageSendJob::class.java, AttachmentUploadJob::class.java, AttachmentDownloadJob::class.java ) diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageSendJob.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageSendJob.kt index 83822c4fc7..7b64f6bb77 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageSendJob.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageSendJob.kt @@ -4,6 +4,7 @@ import com.esotericsoftware.kryo.Kryo import com.esotericsoftware.kryo.io.Input import com.esotericsoftware.kryo.io.Output import org.session.libsession.messaging.MessagingModuleConfiguration +import org.session.libsession.messaging.jobs.Job.Companion.MAX_BUFFER_SIZE import org.session.libsession.messaging.messages.Destination import org.session.libsession.messaging.messages.Message import org.session.libsession.messaging.messages.visible.VisibleMessage @@ -79,7 +80,7 @@ class MessageSendJob(val message: Message, val destination: Destination) : Job { override fun serialize(): Data { val kryo = Kryo() kryo.isRegistrationRequired = false - val output = Output(ByteArray(4096), 10_000_000) + val output = Output(ByteArray(4096), MAX_BUFFER_SIZE) kryo.writeClassAndObject(output, message) output.close() val serializedMessage = output.toBytes() @@ -102,7 +103,13 @@ class MessageSendJob(val message: Message, val destination: Destination) : Job { val serializedMessage = data.getByteArray(KEY_MESSAGE) val serializedDestination = data.getByteArray(KEY_DESTINATION) val kryo = Kryo() + kryo.isRegistrationRequired = false var input = Input(serializedMessage) + val messageClass = kryo.readClass(input) + if (messageClass == null || !Message::class.java.isAssignableFrom(messageClass.type)) { + // if the message class doesn't exist or it doesn't implement `Message` parent class + throw Exception("deserialized messageClass was ${messageClass.type}") + } val message = kryo.readClassAndObject(input) as Message input.close() input = Input(serializedDestination) diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/NotifyPNServerJob.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/NotifyPNServerJob.kt index fb99f54f56..720dd091ac 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/NotifyPNServerJob.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/NotifyPNServerJob.kt @@ -80,6 +80,7 @@ class NotifyPNServerJob(val message: SnodeMessage) : Job { override fun create(data: Data): NotifyPNServerJob { val serializedMessage = data.getByteArray(KEY_MESSAGE) val kryo = Kryo() + kryo.isRegistrationRequired = false val input = Input(serializedMessage) val message: SnodeMessage = kryo.readObject(input, SnodeMessage::class.java) input.close() diff --git a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/ReceivedMessageHandler.kt b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/ReceivedMessageHandler.kt index 2a0b13ae3e..c63e86e56e 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/ReceivedMessageHandler.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/ReceivedMessageHandler.kt @@ -1,7 +1,6 @@ package org.session.libsession.messaging.sending_receiving import android.text.TextUtils -import okhttp3.HttpUrl import org.session.libsession.messaging.MessagingModuleConfiguration import org.session.libsession.messaging.jobs.AttachmentDownloadJob import org.session.libsession.messaging.jobs.JobQueue diff --git a/libsession/src/main/java/org/session/libsession/snode/SnodeAPI.kt b/libsession/src/main/java/org/session/libsession/snode/SnodeAPI.kt index a5094a63ee..cb0d5ee837 100644 --- a/libsession/src/main/java/org/session/libsession/snode/SnodeAPI.kt +++ b/libsession/src/main/java/org/session/libsession/snode/SnodeAPI.kt @@ -92,7 +92,7 @@ object SnodeAPI { "method" to "get_n_service_nodes", "params" to mapOf( "active_only" to true, - "limit" to 256, +// "limit" to 256, "fields" to mapOf( "public_ip" to true, "storage_port" to true, "pubkey_x25519" to true, "pubkey_ed25519" to true ) ) ) From 18818bf8da16b1ac87bbf491129266bea024fdeb Mon Sep 17 00:00:00 2001 From: jubb Date: Wed, 12 May 2021 11:24:08 +1000 Subject: [PATCH 3/4] refactor: re-add the node limit --- .../src/main/java/org/session/libsession/snode/SnodeAPI.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libsession/src/main/java/org/session/libsession/snode/SnodeAPI.kt b/libsession/src/main/java/org/session/libsession/snode/SnodeAPI.kt index cb0d5ee837..a5094a63ee 100644 --- a/libsession/src/main/java/org/session/libsession/snode/SnodeAPI.kt +++ b/libsession/src/main/java/org/session/libsession/snode/SnodeAPI.kt @@ -92,7 +92,7 @@ object SnodeAPI { "method" to "get_n_service_nodes", "params" to mapOf( "active_only" to true, -// "limit" to 256, + "limit" to 256, "fields" to mapOf( "public_ip" to true, "storage_port" to true, "pubkey_x25519" to true, "pubkey_ed25519" to true ) ) ) From edc1454609180dd8690dca22a8cff5da34dfb8dc Mon Sep 17 00:00:00 2001 From: jubb Date: Wed, 12 May 2021 16:48:18 +1000 Subject: [PATCH 4/4] fix: unnamed open groups being processed by creating new threads after deletion job db not marking successful/unsuccessful properly handling send and receive better / in order --- .../securesms/database/Storage.kt | 2 +- .../loki/database/SessionJobDatabase.kt | 8 +-- .../libsession/messaging/jobs/JobQueue.kt | 59 ++++++++++++------- .../messaging/jobs/MessageSendJob.kt | 8 ++- .../ReceivedMessageHandler.kt | 7 ++- 5 files changed, 55 insertions(+), 29 deletions(-) diff --git a/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt b/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt index 6a2ff6e8df..f93670ae88 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt @@ -581,7 +581,7 @@ class Storage(context: Context, helper: SQLCipherOpenHelper) : Database(context, val database = DatabaseFactory.getThreadDatabase(context) if (!openGroupID.isNullOrEmpty()) { val recipient = Recipient.from(context, Address.fromSerialized(GroupUtil.getEncodedOpenGroupID(openGroupID.toByteArray())), false) - return database.getOrCreateThreadIdFor(recipient) + return database.getThreadIdIfExistsFor(recipient) } else if (!groupPublicKey.isNullOrEmpty()) { val recipient = Recipient.from(context, Address.fromSerialized(GroupUtil.doubleEncodeGroupID(groupPublicKey)), false) return database.getOrCreateThreadIdFor(recipient) diff --git a/app/src/main/java/org/thoughtcrime/securesms/loki/database/SessionJobDatabase.kt b/app/src/main/java/org/thoughtcrime/securesms/loki/database/SessionJobDatabase.kt index 3647a9937c..d8c072dd5f 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/loki/database/SessionJobDatabase.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/loki/database/SessionJobDatabase.kt @@ -29,15 +29,15 @@ class SessionJobDatabase(context: Context, helper: SQLCipherOpenHelper) : Databa contentValues.put(jobType, job.getFactoryKey()) contentValues.put(failureCount, job.failureCount) contentValues.put(serializedData, SessionJobHelper.dataSerializer.serialize(job.serialize())) - database.insertOrUpdate(sessionJobTable, contentValues, "$jobID = ?", arrayOf(jobID)) + database.insertOrUpdate(sessionJobTable, contentValues, "$jobID = ?", arrayOf(job.id!!)) } fun markJobAsSucceeded(jobID: String) { - databaseHelper.writableDatabase.delete(sessionJobTable, "$jobID = ?", arrayOf( jobID )) + databaseHelper.writableDatabase.delete(sessionJobTable, "${Companion.jobID} = ?", arrayOf( jobID )) } fun markJobAsFailed(jobID: String) { - databaseHelper.writableDatabase.delete(sessionJobTable, "$jobID = ?", arrayOf( jobID )) + databaseHelper.writableDatabase.delete(sessionJobTable, "${Companion.jobID} = ?", arrayOf( jobID )) } fun getAllPendingJobs(type: String): Map { @@ -75,7 +75,7 @@ class SessionJobDatabase(context: Context, helper: SQLCipherOpenHelper) : Databa var cursor: android.database.Cursor? = null try { cursor = database.rawQuery("SELECT * FROM $sessionJobTable WHERE $jobID = ?", arrayOf( job.id )) - return cursor != null && cursor.moveToFirst() + return cursor == null || !cursor.moveToFirst() } catch (e: Exception) { // Do nothing } finally { diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt index 246a766fce..fab49384fd 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt @@ -17,29 +17,50 @@ import kotlin.math.roundToLong class JobQueue : JobDelegate { private var hasResumedPendingJobs = false // Just for debugging private val jobTimestampMap = ConcurrentHashMap() - private val dispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher() - private val multiDispatcher = Executors.newFixedThreadPool(2).asCoroutineDispatcher() + private val rxDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher() + private val txDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher() + private val attachmentDispatcher = Executors.newFixedThreadPool(2).asCoroutineDispatcher() private val scope = GlobalScope + SupervisorJob() private val queue = Channel(UNLIMITED) val timer = Timer() + private fun CoroutineScope.processWithDispatcher(channel: Channel, dispatcher: CoroutineDispatcher) = launch(dispatcher) { + for (job in channel) { + if (!isActive) break + job.delegate = this@JobQueue + job.execute() + } + } + init { // Process jobs - scope.launch(dispatcher) { + scope.launch { + val rxQueue = Channel(capacity = 1024) + val txQueue = Channel(capacity = 1024) + val attachmentQueue = Channel(capacity = 1024) + + val receiveJob = processWithDispatcher(rxQueue, rxDispatcher) + val txJob = processWithDispatcher(txQueue, txDispatcher) + val attachmentJob = processWithDispatcher(attachmentQueue, attachmentDispatcher) + while (isActive) { - queue.receive().let { job -> - if (job.canExecuteParallel()) { - launch(multiDispatcher) { - job.delegate = this@JobQueue - job.execute() - } - } else { - job.delegate = this@JobQueue - job.execute() + for (job in queue) { + when (job) { + is NotifyPNServerJob, + is AttachmentUploadJob, + is MessageSendJob -> txQueue.send(job) + is AttachmentDownloadJob -> attachmentQueue.send(job) + else -> rxQueue.send(job) } } } + + // job has been cancelled + receiveJob.cancel() + txJob.cancel() + attachmentJob.cancel() + } } @@ -49,14 +70,6 @@ class JobQueue : JobDelegate { val shared: JobQueue by lazy { JobQueue() } } - private fun Job.canExecuteParallel(): Boolean { - return this.javaClass in arrayOf( - MessageSendJob::class.java, - AttachmentUploadJob::class.java, - AttachmentDownloadJob::class.java - ) - } - fun add(job: Job) { addWithoutExecuting(job) queue.offer(job) // offer always called on unlimited capacity @@ -112,8 +125,10 @@ class JobQueue : JobDelegate { override fun handleJobFailed(job: Job, error: Exception) { job.failureCount += 1 val storage = MessagingModuleConfiguration.shared.storage - if (storage.isJobCanceled(job)) { return Log.i("Loki", "${job::class.simpleName} canceled.")} - if (job.failureCount == job.maxFailureCount) { + if (storage.isJobCanceled(job)) { + return Log.i("Loki", "${job::class.simpleName} canceled.") + } + if (job.failureCount >= job.maxFailureCount) { handleJobFailedPermanently(job, error) } else { storage.persistJob(job) diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageSendJob.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageSendJob.kt index 6c187b686a..1a0e4e57f4 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageSendJob.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageSendJob.kt @@ -12,6 +12,9 @@ import org.session.libsession.messaging.sending_receiving.MessageSender import org.session.libsignal.utilities.logging.Log class MessageSendJob(val message: Message, val destination: Destination) : Job { + + object AwaitingUploadException: Exception("Awaiting attachment upload") + override var delegate: JobDelegate? = null override var id: String? = null override var failureCount: Int = 0 @@ -46,7 +49,10 @@ class MessageSendJob(val message: Message, val destination: Destination) : Job { JobQueue.shared.add(job) } } - if (attachmentsToUpload.isNotEmpty()) return // Wait for all attachments to upload before continuing + if (attachmentsToUpload.isNotEmpty()) { + this.handleFailure(AwaitingUploadException) + return + } // Wait for all attachments to upload before continuing } MessageSender.send(this.message, this.destination).success { this.handleSuccess() diff --git a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/ReceivedMessageHandler.kt b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/ReceivedMessageHandler.kt index 30f5d81743..41eb261b95 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/ReceivedMessageHandler.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/ReceivedMessageHandler.kt @@ -155,6 +155,11 @@ fun MessageReceiver.handleVisibleMessage(message: VisibleMessage, proto: SignalS val threadID = storage.getOrCreateThreadIdFor(message.syncTarget ?: message.sender!!, message.groupPublicKey, openGroupID) + if (threadID < 0) { + // thread doesn't exist, should only be reached in a case where we are processing open group messages for no longer existent thread + throw MessageReceiver.Error.NoThread + } + val openGroup = threadID.let { storage.getOpenGroup(it.toString()) } @@ -233,7 +238,7 @@ fun MessageReceiver.handleVisibleMessage(message: VisibleMessage, proto: SignalS } val openGroupServerID = message.openGroupServerMessageID if (openGroupServerID != null) { - storage.setOpenGroupServerMessageID(messageID, openGroupServerID, threadID, !(message.isMediaMessage() || attachments.isNotEmpty())) + storage.setOpenGroupServerMessageID(messageID, openGroupServerID, threadID, !message.isMediaMessage()) } // Cancel any typing indicators if needed cancelTypingIndicatorsIfNeeded(message.sender!!)