mirror of
https://github.com/oxen-io/session-android.git
synced 2025-08-12 11:27:49 +00:00
Merge branch 'dev' of https://github.com/loki-project/session-android into open-group-invitations
This commit is contained in:
@@ -44,9 +44,9 @@ interface StorageProtocol {
|
||||
|
||||
// Jobs
|
||||
fun persistJob(job: Job)
|
||||
fun markJobAsSucceeded(job: Job)
|
||||
fun markJobAsFailed(job: Job)
|
||||
fun getAllPendingJobs(type: String): List<Job>
|
||||
fun markJobAsSucceeded(jobId: String)
|
||||
fun markJobAsFailed(jobId: String)
|
||||
fun getAllPendingJobs(type: String): Map<String,Job?>
|
||||
fun getAttachmentUploadJob(attachmentID: Long): AttachmentUploadJob?
|
||||
fun getMessageSendJob(messageSendJobID: String): MessageSendJob?
|
||||
fun resumeMessageSendJobIfNeeded(messageSendJobID: String)
|
||||
|
@@ -108,7 +108,7 @@ class AttachmentUploadJob(val attachmentID: Long, val threadID: String, val mess
|
||||
val messageSendJob = storage.getMessageSendJob(messageSendJobID)
|
||||
MessageSender.handleFailedMessageSend(this.message, e)
|
||||
if (messageSendJob != null) {
|
||||
storage.markJobAsFailed(messageSendJob)
|
||||
storage.markJobAsFailed(messageSendJobID)
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -78,10 +78,24 @@ class JobQueue : JobDelegate {
|
||||
return
|
||||
}
|
||||
hasResumedPendingJobs = true
|
||||
val allJobTypes = listOf(AttachmentDownloadJob.KEY, AttachmentDownloadJob.KEY, MessageReceiveJob.KEY, MessageSendJob.KEY, NotifyPNServerJob.KEY)
|
||||
val allJobTypes = listOf(AttachmentUploadJob.KEY,
|
||||
AttachmentDownloadJob.KEY,
|
||||
MessageReceiveJob.KEY,
|
||||
MessageSendJob.KEY,
|
||||
NotifyPNServerJob.KEY
|
||||
)
|
||||
allJobTypes.forEach { type ->
|
||||
val allPendingJobs = MessagingModuleConfiguration.shared.storage.getAllPendingJobs(type)
|
||||
allPendingJobs.sortedBy { it.id }.forEach { job ->
|
||||
val pendingJobs = mutableListOf<Job>()
|
||||
for ((id, job) in allPendingJobs) {
|
||||
if (job == null) {
|
||||
// job failed to serialize, remove it from the DB
|
||||
handleJobFailedPermanently(id)
|
||||
} else {
|
||||
pendingJobs.add(job)
|
||||
}
|
||||
}
|
||||
pendingJobs.sortedBy { it.id }.forEach { job ->
|
||||
Log.i("Jobs", "Resuming pending job of type: ${job::class.simpleName}.")
|
||||
queue.offer(job) // Offer always called on unlimited capacity
|
||||
}
|
||||
@@ -89,17 +103,18 @@ class JobQueue : JobDelegate {
|
||||
}
|
||||
|
||||
override fun handleJobSucceeded(job: Job) {
|
||||
MessagingModuleConfiguration.shared.storage.markJobAsSucceeded(job)
|
||||
val jobId = job.id ?: return
|
||||
MessagingModuleConfiguration.shared.storage.markJobAsSucceeded(jobId)
|
||||
}
|
||||
|
||||
override fun handleJobFailed(job: Job, error: Exception) {
|
||||
job.failureCount += 1
|
||||
val storage = MessagingModuleConfiguration.shared.storage
|
||||
if (storage.isJobCanceled(job)) { return Log.i("Jobs", "${job::class.simpleName} canceled.")}
|
||||
storage.persistJob(job)
|
||||
if (job.failureCount == job.maxFailureCount) {
|
||||
storage.markJobAsFailed(job)
|
||||
handleJobFailedPermanently(job, error)
|
||||
} else {
|
||||
storage.persistJob(job)
|
||||
val retryInterval = getRetryInterval(job)
|
||||
Log.i("Jobs", "${job::class.simpleName} failed; scheduling retry (failure count is ${job.failureCount}).")
|
||||
timer.schedule(delay = retryInterval) {
|
||||
@@ -110,10 +125,13 @@ class JobQueue : JobDelegate {
|
||||
}
|
||||
|
||||
override fun handleJobFailedPermanently(job: Job, error: Exception) {
|
||||
job.failureCount += 1
|
||||
val jobId = job.id ?: return
|
||||
handleJobFailedPermanently(jobId)
|
||||
}
|
||||
|
||||
private fun handleJobFailedPermanently(jobId: String) {
|
||||
val storage = MessagingModuleConfiguration.shared.storage
|
||||
storage.persistJob(job)
|
||||
storage.markJobAsFailed(job)
|
||||
storage.markJobAsFailed(jobId)
|
||||
}
|
||||
|
||||
private fun getRetryInterval(job: Job): Long {
|
||||
|
@@ -79,7 +79,7 @@ class MessageSendJob(val message: Message, val destination: Destination) : Job {
|
||||
override fun serialize(): Data {
|
||||
val kryo = Kryo()
|
||||
kryo.isRegistrationRequired = false
|
||||
val output = Output(ByteArray(4096), -1) // maxBufferSize '-1' will dynamically grow internally if we run out of room serializing the message
|
||||
val output = Output(ByteArray(4096), 10_000_000)
|
||||
kryo.writeClassAndObject(output, message)
|
||||
output.close()
|
||||
val serializedMessage = output.toBytes()
|
||||
|
Reference in New Issue
Block a user