refactor: make storage reference jobId by string in deletion, don't persist jobs we are about to delete, delete jobs that fail to serialize from storage (probably from corrupt or moved data classes) in temporary message send jobs

This commit is contained in:
jubb 2021-05-07 11:48:03 +10:00
parent 2f23a0e59f
commit 4fff5ac2dc
6 changed files with 49 additions and 25 deletions

View File

@ -103,7 +103,6 @@ import dagger.ObjectGraph;
import kotlin.Unit; import kotlin.Unit;
import kotlinx.coroutines.Job; import kotlinx.coroutines.Job;
import network.loki.messenger.BuildConfig; import network.loki.messenger.BuildConfig;
import nl.komponents.kovenant.Kovenant;
import static nl.komponents.kovenant.android.KovenantAndroid.startKovenant; import static nl.komponents.kovenant.android.KovenantAndroid.startKovenant;
import static nl.komponents.kovenant.android.KovenantAndroid.stopKovenant; import static nl.komponents.kovenant.android.KovenantAndroid.stopKovenant;

View File

@ -185,15 +185,15 @@ class Storage(context: Context, helper: SQLCipherOpenHelper) : Database(context,
DatabaseFactory.getSessionJobDatabase(context).persistJob(job) DatabaseFactory.getSessionJobDatabase(context).persistJob(job)
} }
override fun markJobAsSucceeded(job: Job) { override fun markJobAsSucceeded(jobId: String) {
DatabaseFactory.getSessionJobDatabase(context).markJobAsSucceeded(job) DatabaseFactory.getSessionJobDatabase(context).markJobAsSucceeded(jobId)
} }
override fun markJobAsFailed(job: Job) { override fun markJobAsFailed(jobId: String) {
DatabaseFactory.getSessionJobDatabase(context).markJobAsFailed(job) DatabaseFactory.getSessionJobDatabase(context).markJobAsFailed(jobId)
} }
override fun getAllPendingJobs(type: String): List<Job> { override fun getAllPendingJobs(type: String): Map<String, Job?> {
return DatabaseFactory.getSessionJobDatabase(context).getAllPendingJobs(type) return DatabaseFactory.getSessionJobDatabase(context).getAllPendingJobs(type)
} }

View File

@ -4,6 +4,7 @@ import android.content.ContentValues
import android.content.Context import android.content.Context
import net.sqlcipher.Cursor import net.sqlcipher.Cursor
import org.session.libsession.messaging.jobs.* import org.session.libsession.messaging.jobs.*
import org.session.libsignal.utilities.logging.Log
import org.thoughtcrime.securesms.database.Database import org.thoughtcrime.securesms.database.Database
import org.thoughtcrime.securesms.database.helpers.SQLCipherOpenHelper import org.thoughtcrime.securesms.database.helpers.SQLCipherOpenHelper
import org.thoughtcrime.securesms.jobmanager.impl.JsonDataSerializer import org.thoughtcrime.securesms.jobmanager.impl.JsonDataSerializer
@ -30,19 +31,25 @@ class SessionJobDatabase(context: Context, helper: SQLCipherOpenHelper) : Databa
database.insertOrUpdate(sessionJobTable, contentValues, "$jobID = ?", arrayOf(jobID)) database.insertOrUpdate(sessionJobTable, contentValues, "$jobID = ?", arrayOf(jobID))
} }
fun markJobAsSucceeded(job: Job) { fun markJobAsSucceeded(jobId: String) {
databaseHelper.writableDatabase.delete(sessionJobTable, "$jobID = ?", arrayOf(job.id)) databaseHelper.writableDatabase.delete(sessionJobTable, "$jobID = ?", arrayOf(jobId))
} }
fun markJobAsFailed(job: Job) { fun markJobAsFailed(jobId: String) {
databaseHelper.writableDatabase.delete(sessionJobTable, "$jobID = ?", arrayOf(job.id)) databaseHelper.writableDatabase.delete(sessionJobTable, "$jobID = ?", arrayOf(jobId))
} }
fun getAllPendingJobs(type: String): List<Job> { fun getAllPendingJobs(type: String): Map<String, Job?> {
val database = databaseHelper.readableDatabase val database = databaseHelper.readableDatabase
return database.getAll(sessionJobTable, "$jobType = ?", arrayOf(type)) { cursor -> return database.getAll(sessionJobTable, "$jobType = ?", arrayOf(type)) { cursor ->
jobFromCursor(cursor) val jobId = cursor.getString(jobID)
try {
jobId to jobFromCursor(cursor)
} catch (e: Exception) {
Log.e("Loki", "Error serializing Job of type $type",e)
jobId to null
} }
}.toMap()
} }
fun getAttachmentUploadJob(attachmentID: Long): AttachmentUploadJob? { fun getAttachmentUploadJob(attachmentID: Long): AttachmentUploadJob? {

View File

@ -44,9 +44,9 @@ interface StorageProtocol {
// Jobs // Jobs
fun persistJob(job: Job) fun persistJob(job: Job)
fun markJobAsSucceeded(job: Job) fun markJobAsSucceeded(jobId: String)
fun markJobAsFailed(job: Job) fun markJobAsFailed(jobId: String)
fun getAllPendingJobs(type: String): List<Job> fun getAllPendingJobs(type: String): Map<String,Job?>
fun getAttachmentUploadJob(attachmentID: Long): AttachmentUploadJob? fun getAttachmentUploadJob(attachmentID: Long): AttachmentUploadJob?
fun getMessageSendJob(messageSendJobID: String): MessageSendJob? fun getMessageSendJob(messageSendJobID: String): MessageSendJob?
fun resumeMessageSendJobIfNeeded(messageSendJobID: String) fun resumeMessageSendJobIfNeeded(messageSendJobID: String)

View File

@ -108,7 +108,7 @@ class AttachmentUploadJob(val attachmentID: Long, val threadID: String, val mess
val messageSendJob = storage.getMessageSendJob(messageSendJobID) val messageSendJob = storage.getMessageSendJob(messageSendJobID)
MessageSender.handleFailedMessageSend(this.message, e) MessageSender.handleFailedMessageSend(this.message, e)
if (messageSendJob != null) { if (messageSendJob != null) {
storage.markJobAsFailed(messageSendJob) storage.markJobAsFailed(messageSendJobID)
} }
} }

View File

@ -78,10 +78,24 @@ class JobQueue : JobDelegate {
return return
} }
hasResumedPendingJobs = true hasResumedPendingJobs = true
val allJobTypes = listOf(AttachmentDownloadJob.KEY, AttachmentDownloadJob.KEY, MessageReceiveJob.KEY, MessageSendJob.KEY, NotifyPNServerJob.KEY) val allJobTypes = listOf(AttachmentDownloadJob.KEY,
AttachmentDownloadJob.KEY,
MessageReceiveJob.KEY,
MessageSendJob.KEY,
NotifyPNServerJob.KEY
)
allJobTypes.forEach { type -> allJobTypes.forEach { type ->
val allPendingJobs = MessagingModuleConfiguration.shared.storage.getAllPendingJobs(type) val allPendingJobs = MessagingModuleConfiguration.shared.storage.getAllPendingJobs(type)
allPendingJobs.sortedBy { it.id }.forEach { job -> val pendingJobs = mutableListOf<Job>()
for ((id, job) in allPendingJobs) {
if (job == null) {
// job failed to serialize, remove it from the DB
handleJobFailedPermanently(id)
} else {
pendingJobs.add(job)
}
}
pendingJobs.sortedBy { it.id }.forEach { job ->
Log.i("Jobs", "Resuming pending job of type: ${job::class.simpleName}.") Log.i("Jobs", "Resuming pending job of type: ${job::class.simpleName}.")
queue.offer(job) // Offer always called on unlimited capacity queue.offer(job) // Offer always called on unlimited capacity
} }
@ -89,17 +103,18 @@ class JobQueue : JobDelegate {
} }
override fun handleJobSucceeded(job: Job) { override fun handleJobSucceeded(job: Job) {
MessagingModuleConfiguration.shared.storage.markJobAsSucceeded(job) val jobId = job.id ?: return
MessagingModuleConfiguration.shared.storage.markJobAsSucceeded(jobId)
} }
override fun handleJobFailed(job: Job, error: Exception) { override fun handleJobFailed(job: Job, error: Exception) {
job.failureCount += 1 job.failureCount += 1
val storage = MessagingModuleConfiguration.shared.storage val storage = MessagingModuleConfiguration.shared.storage
if (storage.isJobCanceled(job)) { return Log.i("Jobs", "${job::class.simpleName} canceled.")} if (storage.isJobCanceled(job)) { return Log.i("Jobs", "${job::class.simpleName} canceled.")}
storage.persistJob(job)
if (job.failureCount == job.maxFailureCount) { if (job.failureCount == job.maxFailureCount) {
storage.markJobAsFailed(job) handleJobFailedPermanently(job, error)
} else { } else {
storage.persistJob(job)
val retryInterval = getRetryInterval(job) val retryInterval = getRetryInterval(job)
Log.i("Jobs", "${job::class.simpleName} failed; scheduling retry (failure count is ${job.failureCount}).") Log.i("Jobs", "${job::class.simpleName} failed; scheduling retry (failure count is ${job.failureCount}).")
timer.schedule(delay = retryInterval) { timer.schedule(delay = retryInterval) {
@ -110,10 +125,13 @@ class JobQueue : JobDelegate {
} }
override fun handleJobFailedPermanently(job: Job, error: Exception) { override fun handleJobFailedPermanently(job: Job, error: Exception) {
job.failureCount += 1 val jobId = job.id ?: return
handleJobFailedPermanently(jobId)
}
private fun handleJobFailedPermanently(jobId: String) {
val storage = MessagingModuleConfiguration.shared.storage val storage = MessagingModuleConfiguration.shared.storage
storage.persistJob(job) storage.markJobAsFailed(jobId)
storage.markJobAsFailed(job)
} }
private fun getRetryInterval(job: Job): Long { private fun getRetryInterval(job: Job): Long {