diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/AttachmentDownloadJob.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/AttachmentDownloadJob.kt index 036502604b..4c06c85547 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/AttachmentDownloadJob.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/AttachmentDownloadJob.kt @@ -1,4 +1,17 @@ -package org.session.messaging.jobs +package org.session.libsession.messaging.jobs class AttachmentDownloadJob: Job { + override var delegate: JobDelegate? = null + override var id: String? = null + override var failureCount: Int = 0 + + // Settings + override val maxFailureCount: Int = 100 + companion object { + val collection: String = "AttachmentDownloadJobCollection" + } + + override fun execute() { + TODO("Not yet implemented") + } } \ No newline at end of file diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/AttachmentUploadJob.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/AttachmentUploadJob.kt index 8cd49dff8c..d0fc7ce7e7 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/AttachmentUploadJob.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/AttachmentUploadJob.kt @@ -1,4 +1,17 @@ -package org.session.messaging.jobs +package org.session.libsession.messaging.jobs class AttachmentUploadJob : Job { + override var delegate: JobDelegate? = null + override var id: String? = null + override var failureCount: Int = 0 + + // Settings + override val maxFailureCount: Int = 20 + companion object { + val collection: String = "AttachmentUploadJobCollection" + } + + override fun execute() { + TODO("Not yet implemented") + } } \ No newline at end of file diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/Job.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/Job.kt index 2913063b16..c3108835cf 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/Job.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/Job.kt @@ -1,4 +1,11 @@ -package org.session.messaging.jobs +package org.session.libsession.messaging.jobs interface Job { + var delegate: JobDelegate? + var id: String? + var failureCount: Int + + val maxFailureCount: Int + + fun execute() } \ No newline at end of file diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/JobDelegate.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/JobDelegate.kt index ba4f2c9afc..0efe78fbda 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/JobDelegate.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/JobDelegate.kt @@ -1,4 +1,7 @@ -package org.session.messaging.jobs +package org.session.libsession.messaging.jobs interface JobDelegate { + fun handleJobSucceeded(job: Job) + fun handleJobFailed(job: Job, error: Exception) + fun handleJobFailedPermanently(job: Job, error: Exception) } \ No newline at end of file diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt index 5e3fa3990e..0054a77b1d 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt @@ -1,4 +1,89 @@ -package org.session.messaging.jobs +package org.session.libsession.messaging.jobs + +import kotlin.math.min +import kotlin.math.pow +import java.util.Timer + +import org.session.libsession.messaging.Configuration + +import org.session.libsignal.libsignal.logging.Log +import kotlin.concurrent.schedule +import kotlin.math.roundToLong + class JobQueue : JobDelegate { + private var hasResumedPendingJobs = false // Just for debugging + + companion object { + val shared: JobQueue by lazy { JobQueue() } + } + + fun add(job: Job) { + addWithoutExecuting(job) + job.execute() + } + + fun addWithoutExecuting(job: Job) { + job.id = System.currentTimeMillis().toString() + Configuration.shared.storage.persist(job) + job.delegate = this + } + + fun resumePendingJobs() { + if (hasResumedPendingJobs) { + Log.d("Loki", "resumePendingJobs() should only be called once.") + return + } + hasResumedPendingJobs = true + val allJobTypes = listOf(AttachmentDownloadJob.collection, AttachmentDownloadJob.collection, MessageReceiveJob.collection, MessageSendJob.collection, NotifyPNServerJob.collection) + allJobTypes.forEach { type -> + val allPendingJobs = Configuration.shared.storage.getAllPendingJobs(type) + allPendingJobs.sortedBy { it.id }.forEach { job -> + Log.i("Jobs", "Resuming pending job of type: ${job::class.simpleName}.") + job.delegate = this + job.execute() + } + } + } + + override fun handleJobSucceeded(job: Job) { + Configuration.shared.storage.markJobAsSucceeded(job) + } + + override fun handleJobFailed(job: Job, error: Exception) { + job.failureCount += 1 + val storage = Configuration.shared.storage + if (storage.isJobCanceled(job)) { return Log.i("Jobs", "${job::class.simpleName} canceled.")} + storage.persist(job) + if (job.failureCount == job.maxFailureCount) { + storage.markJobAsFailed(job) + } else { + val retryInterval = getRetryInterval(job) + Log.i("Jobs", "${job::class.simpleName} failed; scheduling retry (failure count is ${job.failureCount}).") + Timer().schedule(delay = retryInterval) { + Log.i("Jobs", "Retrying ${job::class.simpleName}.") + job.execute() + } + } + } + + override fun handleJobFailedPermanently(job: Job, error: Exception) { + job.failureCount += 1 + val storage = Configuration.shared.storage + storage.persist(job) + storage.markJobAsFailed(job) + } + + private fun getRetryInterval(job: Job): Long { + // Arbitrary backoff factor... + // try 1 delay: 0ms + // try 2 delay: 190ms + // ... + // try 5 delay: 1300ms + // ... + // try 11 delay: 61310ms + val backoffFactor = 1.9 + val maxBackoff = (60 * 60 * 1000).toDouble() + return (100 * min(maxBackoff, backoffFactor.pow(job.failureCount))).roundToLong() + } } \ No newline at end of file diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageReceiveJob.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageReceiveJob.kt index cc12a399bf..fb5e3d34b5 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageReceiveJob.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageReceiveJob.kt @@ -1,4 +1,17 @@ -package org.session.messaging.jobs +package org.session.libsession.messaging.jobs class MessageReceiveJob : Job { + override var delegate: JobDelegate? = null + override var id: String? = null + override var failureCount: Int = 0 + + // Settings + override val maxFailureCount: Int = 10 + companion object { + val collection: String = "MessageReceiveJobCollection" + } + + override fun execute() { + TODO("Not yet implemented") + } } \ No newline at end of file diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageSendJob.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageSendJob.kt index a9c44b77b1..9e32623fde 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageSendJob.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageSendJob.kt @@ -1,4 +1,17 @@ -package org.session.messaging.jobs +package org.session.libsession.messaging.jobs class MessageSendJob : Job { + override var delegate: JobDelegate? = null + override var id: String? = null + override var failureCount: Int = 0 + + // Settings + override val maxFailureCount: Int = 10 + companion object { + val collection: String = "MessageSendJobCollection" + } + + override fun execute() { + TODO("Not yet implemented") + } } \ No newline at end of file diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/NotifyPNServerJob.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/NotifyPNServerJob.kt index 134ddb7c45..b9221efcf5 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/NotifyPNServerJob.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/NotifyPNServerJob.kt @@ -1,4 +1,57 @@ -package org.session.messaging.jobs +package org.session.libsession.messaging.jobs -class NotifyPNServerJob : Job { +import nl.komponents.kovenant.functional.map +import okhttp3.MediaType +import okhttp3.Request +import okhttp3.RequestBody + +import org.session.libsession.messaging.sending_receiving.notifications.PushNotificationAPI +import org.session.libsession.snode.SnodeMessage +import org.session.libsession.snode.OnionRequestAPI + +import org.session.libsignal.libsignal.logging.Log +import org.session.libsignal.service.internal.util.JsonUtil +import org.session.libsignal.service.loki.utilities.retryIfNeeded + +class NotifyPNServerJob(val message: SnodeMessage) : Job { + override var delegate: JobDelegate? = null + override var id: String? = null + override var failureCount: Int = 0 + + // Settings + override val maxFailureCount: Int = 20 + companion object { + val collection: String = "NotifyPNServerJobCollection" + } + + // Running + override fun execute() { + val server = PushNotificationAPI.server + val parameters = mapOf( "data" to message.data, "send_to" to message.recipient ) + val url = "${server}/notify" + val body = RequestBody.create(MediaType.get("application/json"), JsonUtil.toJson(parameters)) + val request = Request.Builder().url(url).post(body) + retryIfNeeded(4) { + OnionRequestAPI.sendOnionRequest(request.build(), server, PushNotificationAPI.serverPublicKey, "/loki/v2/lsrpc").map { json -> + val code = json["code"] as? Int + if (code == null || code == 0) { + Log.d("Loki", "[Loki] Couldn't notify PN server due to error: ${json["message"] as? String ?: "null"}.") + } + }.fail { exception -> + Log.d("Loki", "[Loki] Couldn't notify PN server due to error: $exception.") + } + }.success { + handleSuccess() + }. fail { + handleFailure(it) + } + } + + private fun handleSuccess() { + delegate?.handleJobSucceeded(this) + } + + private fun handleFailure(error: Exception) { + delegate?.handleJobFailed(this, error) + } } \ No newline at end of file