fix: adding some message receive functionality

This commit is contained in:
jubb
2021-03-12 17:15:33 +11:00
parent ca7202f255
commit 323fb75149
12 changed files with 93 additions and 67 deletions

View File

@@ -23,7 +23,7 @@ interface MessageDataProvider {
fun setAttachmentState(attachmentState: AttachmentState, attachmentId: Long, messageID: Long)
fun insertAttachment(messageId: Long, attachmentId: Long, stream : InputStream)
fun insertAttachment(messageId: Long, attachmentId: AttachmentId, stream : InputStream)
fun isOutgoingMessage(timestamp: Long): Boolean

View File

@@ -5,6 +5,8 @@ import org.session.libsession.messaging.fileserver.FileServerAPI
import org.session.libsession.messaging.sending_receiving.attachments.AttachmentState
import org.session.libsession.messaging.utilities.DotNetAPI
import org.session.libsignal.service.api.crypto.AttachmentCipherInputStream
import org.session.libsignal.utilities.Base64
import org.session.libsignal.utilities.logging.Log
import java.io.File
import java.io.FileInputStream
@@ -33,7 +35,8 @@ class AttachmentDownloadJob(val attachmentID: Long, val databaseMessageID: Long)
override fun execute() {
val messageDataProvider = MessagingConfiguration.shared.messageDataProvider
val attachmentStream = messageDataProvider.getAttachmentStream(attachmentID) ?: return handleFailure(Error.NoAttachment)
messageDataProvider.getDatabaseAttachment(attachmentID)
val attachment = messageDataProvider.getDatabaseAttachment(attachmentID) ?: return handleFailure(Error.NoAttachment)
messageDataProvider.setAttachmentState(AttachmentState.STARTED, attachmentID, this.databaseMessageID)
val tempFile = createTempFile()
val handleFailure: (java.lang.Exception) -> Unit = { exception ->
@@ -51,7 +54,7 @@ class AttachmentDownloadJob(val attachmentID: Long, val databaseMessageID: Long)
}
}
try {
FileServerAPI.shared.downloadFile(tempFile, attachmentStream.url, MAX_ATTACHMENT_SIZE, attachmentStream.listener)
FileServerAPI.shared.downloadFile(tempFile, attachment.url, MAX_ATTACHMENT_SIZE, null)
} catch (e: Exception) {
return handleFailure(e)
}
@@ -59,16 +62,17 @@ class AttachmentDownloadJob(val attachmentID: Long, val databaseMessageID: Long)
// DECRYPTION
// Assume we're retrieving an attachment for an open group server if the digest is not set
var stream = if (!attachmentStream.digest.isPresent || attachmentStream.key == null) FileInputStream(tempFile)
else AttachmentCipherInputStream.createForAttachment(tempFile, attachmentStream.length.or(0).toLong(), attachmentStream.key?.toByteArray(), attachmentStream?.digest.get())
val stream = if (attachment.digest == null || attachment.key == null) FileInputStream(tempFile)
else AttachmentCipherInputStream.createForAttachment(tempFile, attachment.size, Base64.decode(attachment.key), attachment.digest)
messageDataProvider.insertAttachment(databaseMessageID, attachmentID, stream)
messageDataProvider.insertAttachment(databaseMessageID, attachment.attachmentId, stream)
tempFile.delete()
handleSuccess()
}
private fun handleSuccess() {
Log.w(AttachmentUploadJob.TAG, "Attachment downloaded successfully.")
delegate?.handleJobSucceeded(this)
}

View File

@@ -15,7 +15,6 @@ import org.session.libsignal.service.internal.push.PushAttachmentData
import org.session.libsignal.service.internal.push.http.AttachmentCipherOutputStreamFactory
import org.session.libsignal.service.internal.util.Util
import org.session.libsignal.service.loki.utilities.PlaintextOutputStreamFactory
import org.session.libsignal.utilities.ThreadUtils
import org.session.libsignal.utilities.logging.Log
class AttachmentUploadJob(val attachmentID: Long, val threadID: String, val message: Message, val messageSendJobID: String) : Job {
@@ -45,41 +44,40 @@ class AttachmentUploadJob(val attachmentID: Long, val threadID: String, val mess
}
override fun execute() {
ThreadUtils.queue {
try {
val attachment = MessagingConfiguration.shared.messageDataProvider.getScaledSignalAttachmentStream(attachmentID)
?: return@queue handleFailure(Error.NoAttachment)
try {
val attachment = MessagingConfiguration.shared.messageDataProvider.getScaledSignalAttachmentStream(attachmentID)
?: return handleFailure(Error.NoAttachment)
var server = FileServerAPI.shared.server
var shouldEncrypt = true
val usePadding = false
val openGroup = MessagingConfiguration.shared.storage.getOpenGroup(threadID)
openGroup?.let {
server = it.server
shouldEncrypt = false
}
var server = FileServerAPI.shared.server
var shouldEncrypt = true
val usePadding = false
val openGroup = MessagingConfiguration.shared.storage.getOpenGroup(threadID)
openGroup?.let {
server = it.server
shouldEncrypt = false
}
val attachmentKey = Util.getSecretBytes(64)
val paddedLength = if (usePadding) PaddingInputStream.getPaddedSize(attachment.length) else attachment.length
val dataStream = if (usePadding) PaddingInputStream(attachment.inputStream, attachment.length) else attachment.inputStream
val ciphertextLength = if (shouldEncrypt) AttachmentCipherOutputStream.getCiphertextLength(paddedLength) else attachment.length
val attachmentKey = Util.getSecretBytes(64)
val paddedLength = if (usePadding) PaddingInputStream.getPaddedSize(attachment.length) else attachment.length
val dataStream = if (usePadding) PaddingInputStream(attachment.inputStream, attachment.length) else attachment.inputStream
val ciphertextLength = if (shouldEncrypt) AttachmentCipherOutputStream.getCiphertextLength(paddedLength) else attachment.length
val outputStreamFactory = if (shouldEncrypt) AttachmentCipherOutputStreamFactory(attachmentKey) else PlaintextOutputStreamFactory()
val attachmentData = PushAttachmentData(attachment.contentType, dataStream, ciphertextLength, outputStreamFactory, attachment.listener)
val outputStreamFactory = if (shouldEncrypt) AttachmentCipherOutputStreamFactory(attachmentKey) else PlaintextOutputStreamFactory()
val attachmentData = PushAttachmentData(attachment.contentType, dataStream, ciphertextLength, outputStreamFactory, attachment.listener)
val uploadResult = FileServerAPI.shared.uploadAttachment(server, attachmentData)
handleSuccess(attachment, attachmentKey, uploadResult)
val uploadResult = FileServerAPI.shared.uploadAttachment(server, attachmentData)
handleSuccess(attachment, attachmentKey, uploadResult)
} catch (e: java.lang.Exception) {
if (e is Error && e == Error.NoAttachment) {
this.handlePermanentFailure(e)
} else if (e is DotNetAPI.Error && !e.isRetryable) {
this.handlePermanentFailure(e)
} else {
this.handleFailure(e)
}
} catch (e: java.lang.Exception) {
if (e is Error && e == Error.NoAttachment) {
this.handlePermanentFailure(e)
} else if (e is DotNetAPI.Error && !e.isRetryable) {
this.handlePermanentFailure(e)
} else {
this.handleFailure(e)
}
}
}
private fun handleSuccess(attachment: SignalServiceAttachmentStream, attachmentKey: ByteArray, uploadResult: DotNetAPI.UploadResult) {

View File

@@ -1,32 +1,51 @@
package org.session.libsession.messaging.jobs
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
import org.session.libsession.messaging.MessagingConfiguration
import org.session.libsignal.utilities.logging.Log
import java.util.*
import java.util.concurrent.Executors
import kotlin.concurrent.schedule
import kotlin.math.min
import kotlin.math.pow
import java.util.Timer
import org.session.libsession.messaging.MessagingConfiguration
import org.session.libsignal.utilities.logging.Log
import kotlin.concurrent.schedule
import kotlin.math.roundToLong
class JobQueue : JobDelegate {
private var hasResumedPendingJobs = false // Just for debugging
private val dispatcher = Executors.newCachedThreadPool().asCoroutineDispatcher()
private val scope = GlobalScope + SupervisorJob()
private val queue = Channel<Job>(UNLIMITED)
init {
// process jobs
scope.launch {
while (isActive) {
queue.receive().let { job ->
launch(dispatcher) {
job.delegate = this@JobQueue
job.execute()
}
}
}
}
}
companion object {
val shared: JobQueue by lazy { JobQueue() }
}
fun add(job: Job) {
addWithoutExecuting(job)
job.execute()
queue.offer(job) // offer always called on unlimited capacity
}
fun addWithoutExecuting(job: Job) {
job.id = System.currentTimeMillis().toString()
MessagingConfiguration.shared.storage.persistJob(job)
job.delegate = this
}
fun resumePendingJobs() {
@@ -40,8 +59,7 @@ class JobQueue : JobDelegate {
val allPendingJobs = MessagingConfiguration.shared.storage.getAllPendingJobs(type)
allPendingJobs.sortedBy { it.id }.forEach { job ->
Log.i("Jobs", "Resuming pending job of type: ${job::class.simpleName}.")
job.delegate = this
job.execute()
queue.offer(job) // offer always called on unlimited capacity
}
}
}

View File

@@ -38,13 +38,13 @@ class MessageReceiveJob(val data: ByteArray, val isBackgroundPoll: Boolean, val
this.handleSuccess()
deferred.resolve(Unit)
} catch (e: Exception) {
Log.d(TAG, "Couldn't receive message due to error: $e.")
Log.e(TAG, "Couldn't receive message due to error", e)
val error = e as? MessageReceiver.Error
if (error != null && !error.isRetryable) {
Log.d("Loki", "Message receive job permanently failed due to error: $error.")
Log.e("Loki", "Message receive job permanently failed due to error", e)
this.handlePermanentFailure(error)
} else {
Log.d("Loki", "Couldn't receive message due to error: $e.")
Log.e("Loki", "Couldn't receive message due to error", e)
this.handleFailure(e)
}
deferred.resolve(Unit) // The promise is just used to keep track of when we're done

View File

@@ -3,12 +3,12 @@ package org.session.libsession.messaging.messages.visible
import android.util.Size
import android.webkit.MimeTypeMap
import com.google.protobuf.ByteString
import org.session.libsession.messaging.sending_receiving.attachments.AttachmentTransferProgress
import org.session.libsession.messaging.sending_receiving.attachments.DatabaseAttachment
import org.session.libsignal.service.api.messages.SignalServiceAttachmentPointer
import org.session.libsession.messaging.sending_receiving.attachments.Attachment as SignalAttachment
import org.session.libsignal.service.internal.push.SignalServiceProtos
import org.session.libsignal.utilities.Base64
import java.io.File
import org.session.libsession.messaging.sending_receiving.attachments.Attachment as SignalAttachment
class Attachment {
@@ -101,7 +101,7 @@ class Attachment {
fun toSignalAttachment(): SignalAttachment? {
if (!isValid()) return null
return DatabaseAttachment(null, 0, false, false, contentType, 0,
sizeInBytes?.toLong() ?: 0, fileName, null, key.toString(), null, digest, null, kind == Kind.VOICE_MESSAGE,
sizeInBytes?.toLong() ?: 0, if (fileName.isNullOrEmpty()) null else fileName, null, Base64.encodeBytes(key), null, digest, null, kind == Kind.VOICE_MESSAGE,
size?.width ?: 0, size?.height ?: 0, false, caption, url)
}
}

View File

@@ -12,7 +12,7 @@ class VisibleMessage : Message() {
var syncTarget: String? = null
var text: String? = null
var attachmentIDs:List<Long> = mutableListOf()
val attachmentIDs: MutableList<Long> = mutableListOf()
var quote: Quote? = null
var linkPreview: LinkPreview? = null
var contact: Contact? = null
@@ -51,7 +51,7 @@ class VisibleMessage : Message() {
val databaseAttachment = it as DatabaseAttachment
databaseAttachment.attachmentId.rowId
}
this.attachmentIDs = attachmentIDs as ArrayList<Long>
this.attachmentIDs.addAll(attachmentIDs)
}
fun isMediaMessage(): Boolean {

View File

@@ -2,9 +2,9 @@ package org.session.libsession.messaging.opengroups
import org.session.libsession.messaging.MessagingConfiguration
import org.session.libsession.messaging.messages.visible.VisibleMessage
import org.session.libsignal.utilities.logging.Log
import org.session.libsignal.utilities.Hex
import org.session.libsignal.service.loki.utilities.removing05PrefixIfNeeded
import org.session.libsignal.utilities.Hex
import org.session.libsignal.utilities.logging.Log
import org.whispersystems.curve25519.Curve25519
data class OpenGroupMessage(
@@ -26,7 +26,7 @@ data class OpenGroupMessage(
fun from(message: VisibleMessage, server: String): OpenGroupMessage? {
val storage = MessagingConfiguration.shared.storage
val userPublicKey = storage.getUserPublicKey() ?: return null
var attachmentIDs = message.attachmentIDs
val attachmentIDs = message.attachmentIDs
// Validation
if (!message.isValid()) { return null } // Should be valid at this point
// Quote

View File

@@ -122,7 +122,7 @@ object MessageReceiver {
message.openGroupServerMessageID = openGroupServerID
// Validate
var isValid = message.isValid()
if (message is VisibleMessage && !isValid && proto.dataMessage.attachmentsCount == 0) { isValid = true }
if (message is VisibleMessage && !isValid && proto.dataMessage.attachmentsCount != 0) { isValid = true }
if (!isValid) { throw Error.InvalidMessage }
// Return
return Pair(message, proto)

View File

@@ -133,7 +133,7 @@ fun MessageReceiver.handleVisibleMessage(message: VisibleMessage, proto: SignalS
}
}
val attachmentIDs = storage.persistAttachments(message.id ?: 0, attachments)
message.attachmentIDs = attachmentIDs.toMutableList()
message.attachmentIDs.addAll(attachmentIDs.toMutableList())
// Update profile if needed
val newProfile = message.profile
if (newProfile != null) {