MessageReceive & Send Jobs implementations

This commit is contained in:
Brice 2020-12-15 15:50:15 +11:00
parent e9c5eb5257
commit 3a10d8c1b4
2 changed files with 92 additions and 2 deletions

View File

@ -1,6 +1,13 @@
package org.session.libsession.messaging.jobs package org.session.libsession.messaging.jobs
import nl.komponents.kovenant.Promise
import nl.komponents.kovenant.deferred
import org.session.libsession.messaging.sending_receiving.MessageReceiver
import org.session.libsession.messaging.sending_receiving.handle
import org.session.libsignal.libsignal.logging.Log
class MessageReceiveJob(val data: ByteArray, val isBackgroundPoll: Boolean, val openGroupMessageServerID: Long? = null, val openGroupID: String? = null) : Job { class MessageReceiveJob(val data: ByteArray, val isBackgroundPoll: Boolean, val openGroupMessageServerID: Long? = null, val openGroupID: String? = null) : Job {
override var delegate: JobDelegate? = null override var delegate: JobDelegate? = null
override var id: String? = null override var id: String? = null
override var failureCount: Int = 0 override var failureCount: Int = 0
@ -8,10 +15,43 @@ class MessageReceiveJob(val data: ByteArray, val isBackgroundPoll: Boolean, val
// Settings // Settings
override val maxFailureCount: Int = 10 override val maxFailureCount: Int = 10
companion object { companion object {
val TAG = MessageReceiveJob::class.qualifiedName
val collection: String = "MessageReceiveJobCollection" val collection: String = "MessageReceiveJobCollection"
} }
override fun execute() { override fun execute() {
TODO("Not yet implemented") exec()
}
fun exec(): Promise<Unit, Exception> {
val deferred = deferred<Unit, Exception>()
try {
val (message, proto) = MessageReceiver.parse(this.data, this.openGroupMessageServerID)
MessageReceiver.handle(message, proto, this.openGroupID)
this.handleSuccess()
deferred.resolve(Unit)
} catch (e: Exception) {
Log.d(TAG, "Couldn't receive message due to error: $e.")
val error = e as? MessageReceiver.Error
error?.let {
if (!error.isRetryable) this.handlePermanentFailure(error)
}
this.handleFailure(e)
deferred.resolve(Unit) // The promise is just used to keep track of when we're done
}
return deferred.promise
}
private fun handleSuccess() {
delegate?.handleJobSucceeded(this)
}
private fun handlePermanentFailure(e: Exception) {
delegate?.handleJobFailedPermanently(this, e)
}
private fun handleFailure(e: Exception) {
delegate?.handleJobFailed(this, e)
} }
} }

View File

@ -1,9 +1,15 @@
package org.session.libsession.messaging.jobs package org.session.libsession.messaging.jobs
import org.session.libsession.messaging.MessagingConfiguration
import org.session.libsession.messaging.messages.Destination import org.session.libsession.messaging.messages.Destination
import org.session.libsession.messaging.messages.Message import org.session.libsession.messaging.messages.Message
import org.session.libsession.messaging.messages.visible.VisibleMessage
import org.session.libsession.messaging.sending_receiving.MessageSender
import org.session.libsignal.libsignal.logging.Log
import org.session.libsignal.service.internal.push.SignalServiceProtos
class MessageSendJob(val message: Message, val destination: Destination) : Job { class MessageSendJob(val message: Message, val destination: Destination) : Job {
override var delegate: JobDelegate? = null override var delegate: JobDelegate? = null
override var id: String? = null override var id: String? = null
override var failureCount: Int = 0 override var failureCount: Int = 0
@ -11,10 +17,54 @@ class MessageSendJob(val message: Message, val destination: Destination) : Job {
// Settings // Settings
override val maxFailureCount: Int = 10 override val maxFailureCount: Int = 10
companion object { companion object {
val TAG = MessageSendJob::class.qualifiedName
val collection: String = "MessageSendJobCollection" val collection: String = "MessageSendJobCollection"
} }
override fun execute() { override fun execute() {
TODO("Not yet implemented") val messageDataProvider = MessagingConfiguration.shared.messageDataProvider
val message = message as? VisibleMessage
message?.let {
if(!messageDataProvider.isOutgoingMessage(message.sentTimestamp!!)) return // The message has been deleted
val attachments = message.attachmentIDs.map { messageDataProvider.getAttachment(it) }.filterNotNull()
val attachmentsToUpload = attachments.filter { !it.isUploaded }
attachmentsToUpload.forEach {
if(MessagingConfiguration.shared.storage.getAttachmentUploadJob(it.attachmentId) != null) {
// Wait for it to finish
} else {
val job = AttachmentUploadJob(it.attachmentId, message.threadID!!, message, id!!)
JobQueue.shared.add(job)
}
}
if (attachmentsToUpload.isNotEmpty()) return // Wait for all attachments to upload before continuing
}
MessageSender.send(this.message, this.destination).success {
this.handleSuccess()
}.fail { exception ->
Log.e(TAG, "Couldn't send message due to error: $exception.")
val e = exception as? MessageSender.Error
e?.let {
if (!e.isRetryable) this.handlePermanentFailure(e)
}
this.handleFailure(exception)
}
}
private fun handleSuccess() {
delegate?.handleJobSucceeded(this)
}
private fun handlePermanentFailure(error: Exception) {
delegate?.handleJobFailedPermanently(this, error)
}
private fun handleFailure(error: Exception) {
Log.w(TAG, "Failed to send $message::class.simpleName.")
val message = message as? VisibleMessage
message?.let {
if(!MessagingConfiguration.shared.messageDataProvider.isOutgoingMessage(message.sentTimestamp!!)) return // The message has been deleted
}
delegate?.handleJobFailed(this, error)
} }
} }