Merge branch 'dev' of https://github.com/oxen-io/session-android into client-side-nickname

This commit is contained in:
Ryan ZHAO 2021-05-07 16:31:54 +10:00
commit be3b8a4b7e
8 changed files with 52 additions and 28 deletions

View File

@ -158,8 +158,8 @@ dependencies {
testImplementation 'org.robolectric:shadows-multidex:4.2' testImplementation 'org.robolectric:shadows-multidex:4.2'
} }
def canonicalVersionCode = 157 def canonicalVersionCode = 158
def canonicalVersionName = "1.10.0" def canonicalVersionName = "1.10.1"
def postFixSize = 10 def postFixSize = 10
def abiPostFix = ['armeabi-v7a' : 1, def abiPostFix = ['armeabi-v7a' : 1,

View File

@ -103,7 +103,6 @@ import dagger.ObjectGraph;
import kotlin.Unit; import kotlin.Unit;
import kotlinx.coroutines.Job; import kotlinx.coroutines.Job;
import network.loki.messenger.BuildConfig; import network.loki.messenger.BuildConfig;
import nl.komponents.kovenant.Kovenant;
import static nl.komponents.kovenant.android.KovenantAndroid.startKovenant; import static nl.komponents.kovenant.android.KovenantAndroid.startKovenant;
import static nl.komponents.kovenant.android.KovenantAndroid.stopKovenant; import static nl.komponents.kovenant.android.KovenantAndroid.stopKovenant;

View File

@ -180,15 +180,15 @@ class Storage(context: Context, helper: SQLCipherOpenHelper) : Database(context,
DatabaseFactory.getSessionJobDatabase(context).persistJob(job) DatabaseFactory.getSessionJobDatabase(context).persistJob(job)
} }
override fun markJobAsSucceeded(job: Job) { override fun markJobAsSucceeded(jobId: String) {
DatabaseFactory.getSessionJobDatabase(context).markJobAsSucceeded(job) DatabaseFactory.getSessionJobDatabase(context).markJobAsSucceeded(jobId)
} }
override fun markJobAsFailed(job: Job) { override fun markJobAsFailed(jobId: String) {
DatabaseFactory.getSessionJobDatabase(context).markJobAsFailed(job) DatabaseFactory.getSessionJobDatabase(context).markJobAsFailed(jobId)
} }
override fun getAllPendingJobs(type: String): List<Job> { override fun getAllPendingJobs(type: String): Map<String, Job?> {
return DatabaseFactory.getSessionJobDatabase(context).getAllPendingJobs(type) return DatabaseFactory.getSessionJobDatabase(context).getAllPendingJobs(type)
} }

View File

@ -4,6 +4,7 @@ import android.content.ContentValues
import android.content.Context import android.content.Context
import net.sqlcipher.Cursor import net.sqlcipher.Cursor
import org.session.libsession.messaging.jobs.* import org.session.libsession.messaging.jobs.*
import org.session.libsignal.utilities.logging.Log
import org.thoughtcrime.securesms.database.Database import org.thoughtcrime.securesms.database.Database
import org.thoughtcrime.securesms.database.helpers.SQLCipherOpenHelper import org.thoughtcrime.securesms.database.helpers.SQLCipherOpenHelper
import org.thoughtcrime.securesms.jobmanager.impl.JsonDataSerializer import org.thoughtcrime.securesms.jobmanager.impl.JsonDataSerializer
@ -30,19 +31,25 @@ class SessionJobDatabase(context: Context, helper: SQLCipherOpenHelper) : Databa
database.insertOrUpdate(sessionJobTable, contentValues, "$jobID = ?", arrayOf(jobID)) database.insertOrUpdate(sessionJobTable, contentValues, "$jobID = ?", arrayOf(jobID))
} }
fun markJobAsSucceeded(job: Job) { fun markJobAsSucceeded(jobId: String) {
databaseHelper.writableDatabase.delete(sessionJobTable, "$jobID = ?", arrayOf(job.id)) databaseHelper.writableDatabase.delete(sessionJobTable, "$jobID = ?", arrayOf(jobId))
} }
fun markJobAsFailed(job: Job) { fun markJobAsFailed(jobId: String) {
databaseHelper.writableDatabase.delete(sessionJobTable, "$jobID = ?", arrayOf(job.id)) databaseHelper.writableDatabase.delete(sessionJobTable, "$jobID = ?", arrayOf(jobId))
} }
fun getAllPendingJobs(type: String): List<Job> { fun getAllPendingJobs(type: String): Map<String, Job?> {
val database = databaseHelper.readableDatabase val database = databaseHelper.readableDatabase
return database.getAll(sessionJobTable, "$jobType = ?", arrayOf(type)) { cursor -> return database.getAll(sessionJobTable, "$jobType = ?", arrayOf(type)) { cursor ->
jobFromCursor(cursor) val jobId = cursor.getString(jobID)
try {
jobId to jobFromCursor(cursor)
} catch (e: Exception) {
Log.e("Loki", "Error serializing Job of type $type",e)
jobId to null
} }
}.toMap()
} }
fun getAttachmentUploadJob(attachmentID: Long): AttachmentUploadJob? { fun getAttachmentUploadJob(attachmentID: Long): AttachmentUploadJob? {

View File

@ -44,9 +44,9 @@ interface StorageProtocol {
// Jobs // Jobs
fun persistJob(job: Job) fun persistJob(job: Job)
fun markJobAsSucceeded(job: Job) fun markJobAsSucceeded(jobId: String)
fun markJobAsFailed(job: Job) fun markJobAsFailed(jobId: String)
fun getAllPendingJobs(type: String): List<Job> fun getAllPendingJobs(type: String): Map<String,Job?>
fun getAttachmentUploadJob(attachmentID: Long): AttachmentUploadJob? fun getAttachmentUploadJob(attachmentID: Long): AttachmentUploadJob?
fun getMessageSendJob(messageSendJobID: String): MessageSendJob? fun getMessageSendJob(messageSendJobID: String): MessageSendJob?
fun resumeMessageSendJobIfNeeded(messageSendJobID: String) fun resumeMessageSendJobIfNeeded(messageSendJobID: String)

View File

@ -108,7 +108,7 @@ class AttachmentUploadJob(val attachmentID: Long, val threadID: String, val mess
val messageSendJob = storage.getMessageSendJob(messageSendJobID) val messageSendJob = storage.getMessageSendJob(messageSendJobID)
MessageSender.handleFailedMessageSend(this.message, e) MessageSender.handleFailedMessageSend(this.message, e)
if (messageSendJob != null) { if (messageSendJob != null) {
storage.markJobAsFailed(messageSendJob) storage.markJobAsFailed(messageSendJobID)
} }
} }

View File

@ -78,10 +78,24 @@ class JobQueue : JobDelegate {
return return
} }
hasResumedPendingJobs = true hasResumedPendingJobs = true
val allJobTypes = listOf(AttachmentDownloadJob.KEY, AttachmentDownloadJob.KEY, MessageReceiveJob.KEY, MessageSendJob.KEY, NotifyPNServerJob.KEY) val allJobTypes = listOf(AttachmentUploadJob.KEY,
AttachmentDownloadJob.KEY,
MessageReceiveJob.KEY,
MessageSendJob.KEY,
NotifyPNServerJob.KEY
)
allJobTypes.forEach { type -> allJobTypes.forEach { type ->
val allPendingJobs = MessagingModuleConfiguration.shared.storage.getAllPendingJobs(type) val allPendingJobs = MessagingModuleConfiguration.shared.storage.getAllPendingJobs(type)
allPendingJobs.sortedBy { it.id }.forEach { job -> val pendingJobs = mutableListOf<Job>()
for ((id, job) in allPendingJobs) {
if (job == null) {
// job failed to serialize, remove it from the DB
handleJobFailedPermanently(id)
} else {
pendingJobs.add(job)
}
}
pendingJobs.sortedBy { it.id }.forEach { job ->
Log.i("Jobs", "Resuming pending job of type: ${job::class.simpleName}.") Log.i("Jobs", "Resuming pending job of type: ${job::class.simpleName}.")
queue.offer(job) // Offer always called on unlimited capacity queue.offer(job) // Offer always called on unlimited capacity
} }
@ -89,17 +103,18 @@ class JobQueue : JobDelegate {
} }
override fun handleJobSucceeded(job: Job) { override fun handleJobSucceeded(job: Job) {
MessagingModuleConfiguration.shared.storage.markJobAsSucceeded(job) val jobId = job.id ?: return
MessagingModuleConfiguration.shared.storage.markJobAsSucceeded(jobId)
} }
override fun handleJobFailed(job: Job, error: Exception) { override fun handleJobFailed(job: Job, error: Exception) {
job.failureCount += 1 job.failureCount += 1
val storage = MessagingModuleConfiguration.shared.storage val storage = MessagingModuleConfiguration.shared.storage
if (storage.isJobCanceled(job)) { return Log.i("Jobs", "${job::class.simpleName} canceled.")} if (storage.isJobCanceled(job)) { return Log.i("Jobs", "${job::class.simpleName} canceled.")}
storage.persistJob(job)
if (job.failureCount == job.maxFailureCount) { if (job.failureCount == job.maxFailureCount) {
storage.markJobAsFailed(job) handleJobFailedPermanently(job, error)
} else { } else {
storage.persistJob(job)
val retryInterval = getRetryInterval(job) val retryInterval = getRetryInterval(job)
Log.i("Jobs", "${job::class.simpleName} failed; scheduling retry (failure count is ${job.failureCount}).") Log.i("Jobs", "${job::class.simpleName} failed; scheduling retry (failure count is ${job.failureCount}).")
timer.schedule(delay = retryInterval) { timer.schedule(delay = retryInterval) {
@ -110,10 +125,13 @@ class JobQueue : JobDelegate {
} }
override fun handleJobFailedPermanently(job: Job, error: Exception) { override fun handleJobFailedPermanently(job: Job, error: Exception) {
job.failureCount += 1 val jobId = job.id ?: return
handleJobFailedPermanently(jobId)
}
private fun handleJobFailedPermanently(jobId: String) {
val storage = MessagingModuleConfiguration.shared.storage val storage = MessagingModuleConfiguration.shared.storage
storage.persistJob(job) storage.markJobAsFailed(jobId)
storage.markJobAsFailed(job)
} }
private fun getRetryInterval(job: Job): Long { private fun getRetryInterval(job: Job): Long {

View File

@ -79,7 +79,7 @@ class MessageSendJob(val message: Message, val destination: Destination) : Job {
override fun serialize(): Data { override fun serialize(): Data {
val kryo = Kryo() val kryo = Kryo()
kryo.isRegistrationRequired = false kryo.isRegistrationRequired = false
val output = Output(ByteArray(4096), -1) // maxBufferSize '-1' will dynamically grow internally if we run out of room serializing the message val output = Output(ByteArray(4096), 10_000_000)
kryo.writeClassAndObject(output, message) kryo.writeClassAndObject(output, message)
output.close() output.close()
val serializedMessage = output.toBytes() val serializedMessage = output.toBytes()