From 4fff5ac2dc2b03f4e363335c09a4a63a25195028 Mon Sep 17 00:00:00 2001 From: jubb Date: Fri, 7 May 2021 11:48:03 +1000 Subject: [PATCH] refactor: make storage reference jobId by string in deletion, don't persist jobs we are about to delete, delete jobs that fail to serialize from storage (probably from corrupt or moved data classes) in temporary message send jobs --- .../securesms/ApplicationContext.java | 1 - .../securesms/database/Storage.kt | 10 +++--- .../loki/database/SessionJobDatabase.kt | 21 ++++++++---- .../libsession/messaging/StorageProtocol.kt | 6 ++-- .../messaging/jobs/AttachmentUploadJob.kt | 2 +- .../libsession/messaging/jobs/JobQueue.kt | 34 ++++++++++++++----- 6 files changed, 49 insertions(+), 25 deletions(-) diff --git a/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java b/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java index 33e55e18d3..cc61634f9a 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java +++ b/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java @@ -103,7 +103,6 @@ import dagger.ObjectGraph; import kotlin.Unit; import kotlinx.coroutines.Job; import network.loki.messenger.BuildConfig; -import nl.komponents.kovenant.Kovenant; import static nl.komponents.kovenant.android.KovenantAndroid.startKovenant; import static nl.komponents.kovenant.android.KovenantAndroid.stopKovenant; diff --git a/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt b/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt index 0f449c9f3a..0ab4824343 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt @@ -185,15 +185,15 @@ class Storage(context: Context, helper: SQLCipherOpenHelper) : Database(context, DatabaseFactory.getSessionJobDatabase(context).persistJob(job) } - override fun markJobAsSucceeded(job: Job) { - DatabaseFactory.getSessionJobDatabase(context).markJobAsSucceeded(job) + override fun markJobAsSucceeded(jobId: String) { + DatabaseFactory.getSessionJobDatabase(context).markJobAsSucceeded(jobId) } - override fun markJobAsFailed(job: Job) { - DatabaseFactory.getSessionJobDatabase(context).markJobAsFailed(job) + override fun markJobAsFailed(jobId: String) { + DatabaseFactory.getSessionJobDatabase(context).markJobAsFailed(jobId) } - override fun getAllPendingJobs(type: String): List { + override fun getAllPendingJobs(type: String): Map { return DatabaseFactory.getSessionJobDatabase(context).getAllPendingJobs(type) } diff --git a/app/src/main/java/org/thoughtcrime/securesms/loki/database/SessionJobDatabase.kt b/app/src/main/java/org/thoughtcrime/securesms/loki/database/SessionJobDatabase.kt index dda3b7d7eb..04edd3715e 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/loki/database/SessionJobDatabase.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/loki/database/SessionJobDatabase.kt @@ -4,6 +4,7 @@ import android.content.ContentValues import android.content.Context import net.sqlcipher.Cursor import org.session.libsession.messaging.jobs.* +import org.session.libsignal.utilities.logging.Log import org.thoughtcrime.securesms.database.Database import org.thoughtcrime.securesms.database.helpers.SQLCipherOpenHelper import org.thoughtcrime.securesms.jobmanager.impl.JsonDataSerializer @@ -30,19 +31,25 @@ class SessionJobDatabase(context: Context, helper: SQLCipherOpenHelper) : Databa database.insertOrUpdate(sessionJobTable, contentValues, "$jobID = ?", arrayOf(jobID)) } - fun markJobAsSucceeded(job: Job) { - databaseHelper.writableDatabase.delete(sessionJobTable, "$jobID = ?", arrayOf(job.id)) + fun markJobAsSucceeded(jobId: String) { + databaseHelper.writableDatabase.delete(sessionJobTable, "$jobID = ?", arrayOf(jobId)) } - fun markJobAsFailed(job: Job) { - databaseHelper.writableDatabase.delete(sessionJobTable, "$jobID = ?", arrayOf(job.id)) + fun markJobAsFailed(jobId: String) { + databaseHelper.writableDatabase.delete(sessionJobTable, "$jobID = ?", arrayOf(jobId)) } - fun getAllPendingJobs(type: String): List { + fun getAllPendingJobs(type: String): Map { val database = databaseHelper.readableDatabase return database.getAll(sessionJobTable, "$jobType = ?", arrayOf(type)) { cursor -> - jobFromCursor(cursor) - } + val jobId = cursor.getString(jobID) + try { + jobId to jobFromCursor(cursor) + } catch (e: Exception) { + Log.e("Loki", "Error serializing Job of type $type",e) + jobId to null + } + }.toMap() } fun getAttachmentUploadJob(attachmentID: Long): AttachmentUploadJob? { diff --git a/libsession/src/main/java/org/session/libsession/messaging/StorageProtocol.kt b/libsession/src/main/java/org/session/libsession/messaging/StorageProtocol.kt index f64aa1a032..da604264d1 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/StorageProtocol.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/StorageProtocol.kt @@ -44,9 +44,9 @@ interface StorageProtocol { // Jobs fun persistJob(job: Job) - fun markJobAsSucceeded(job: Job) - fun markJobAsFailed(job: Job) - fun getAllPendingJobs(type: String): List + fun markJobAsSucceeded(jobId: String) + fun markJobAsFailed(jobId: String) + fun getAllPendingJobs(type: String): Map fun getAttachmentUploadJob(attachmentID: Long): AttachmentUploadJob? fun getMessageSendJob(messageSendJobID: String): MessageSendJob? fun resumeMessageSendJobIfNeeded(messageSendJobID: String) diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/AttachmentUploadJob.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/AttachmentUploadJob.kt index f135178618..1d8b1a7170 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/AttachmentUploadJob.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/AttachmentUploadJob.kt @@ -108,7 +108,7 @@ class AttachmentUploadJob(val attachmentID: Long, val threadID: String, val mess val messageSendJob = storage.getMessageSendJob(messageSendJobID) MessageSender.handleFailedMessageSend(this.message, e) if (messageSendJob != null) { - storage.markJobAsFailed(messageSendJob) + storage.markJobAsFailed(messageSendJobID) } } diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt index 9b1f65f4a7..85a1f33289 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt @@ -78,10 +78,24 @@ class JobQueue : JobDelegate { return } hasResumedPendingJobs = true - val allJobTypes = listOf(AttachmentDownloadJob.KEY, AttachmentDownloadJob.KEY, MessageReceiveJob.KEY, MessageSendJob.KEY, NotifyPNServerJob.KEY) + val allJobTypes = listOf(AttachmentDownloadJob.KEY, + AttachmentDownloadJob.KEY, + MessageReceiveJob.KEY, + MessageSendJob.KEY, + NotifyPNServerJob.KEY + ) allJobTypes.forEach { type -> val allPendingJobs = MessagingModuleConfiguration.shared.storage.getAllPendingJobs(type) - allPendingJobs.sortedBy { it.id }.forEach { job -> + val pendingJobs = mutableListOf() + for ((id, job) in allPendingJobs) { + if (job == null) { + // job failed to serialize, remove it from the DB + handleJobFailedPermanently(id) + } else { + pendingJobs.add(job) + } + } + pendingJobs.sortedBy { it.id }.forEach { job -> Log.i("Jobs", "Resuming pending job of type: ${job::class.simpleName}.") queue.offer(job) // Offer always called on unlimited capacity } @@ -89,17 +103,18 @@ class JobQueue : JobDelegate { } override fun handleJobSucceeded(job: Job) { - MessagingModuleConfiguration.shared.storage.markJobAsSucceeded(job) + val jobId = job.id ?: return + MessagingModuleConfiguration.shared.storage.markJobAsSucceeded(jobId) } override fun handleJobFailed(job: Job, error: Exception) { job.failureCount += 1 val storage = MessagingModuleConfiguration.shared.storage if (storage.isJobCanceled(job)) { return Log.i("Jobs", "${job::class.simpleName} canceled.")} - storage.persistJob(job) if (job.failureCount == job.maxFailureCount) { - storage.markJobAsFailed(job) + handleJobFailedPermanently(job, error) } else { + storage.persistJob(job) val retryInterval = getRetryInterval(job) Log.i("Jobs", "${job::class.simpleName} failed; scheduling retry (failure count is ${job.failureCount}).") timer.schedule(delay = retryInterval) { @@ -110,10 +125,13 @@ class JobQueue : JobDelegate { } override fun handleJobFailedPermanently(job: Job, error: Exception) { - job.failureCount += 1 + val jobId = job.id ?: return + handleJobFailedPermanently(jobId) + } + + private fun handleJobFailedPermanently(jobId: String) { val storage = MessagingModuleConfiguration.shared.storage - storage.persistJob(job) - storage.markJobAsFailed(job) + storage.markJobAsFailed(jobId) } private fun getRetryInterval(job: Job): Long {