WIP debug sending attachments

This commit is contained in:
Ryan ZHAO 2021-03-03 15:14:45 +11:00
parent 0ea1ed15e7
commit 9a00906069
9 changed files with 197 additions and 64 deletions

View File

@ -7,17 +7,22 @@ import org.session.libsession.database.MessageDataProvider
import org.session.libsession.messaging.sending_receiving.attachments.*
import org.session.libsession.messaging.threads.Address
import org.session.libsession.messaging.utilities.DotNetAPI
import org.session.libsession.utilities.Util
import org.session.libsignal.libsignal.util.guava.Optional
import org.session.libsignal.service.api.messages.SignalServiceAttachment
import org.session.libsignal.service.api.messages.SignalServiceAttachmentPointer
import org.session.libsignal.service.api.messages.SignalServiceAttachmentStream
import org.session.libsignal.utilities.logging.Log
import org.thoughtcrime.securesms.database.AttachmentDatabase
import org.thoughtcrime.securesms.database.Database
import org.thoughtcrime.securesms.database.DatabaseFactory
import org.thoughtcrime.securesms.database.helpers.SQLCipherOpenHelper
import org.thoughtcrime.securesms.events.PartProgressEvent
import org.thoughtcrime.securesms.jobs.AttachmentUploadJob
import org.thoughtcrime.securesms.mms.MediaConstraints
import org.thoughtcrime.securesms.mms.PartAuthority
import org.thoughtcrime.securesms.transport.UndeliverableMessageException
import org.thoughtcrime.securesms.util.MediaUtil
import java.io.IOException
import java.io.InputStream
@ -41,6 +46,14 @@ class DatabaseAttachmentProvider(context: Context, helper: SQLCipherOpenHelper)
return databaseAttachment.toSignalAttachmentStream(context)
}
override fun getScaledSignalAttachmentStream(attachmentId: Long): SignalServiceAttachmentStream? {
val database = DatabaseFactory.getAttachmentDatabase(context)
val databaseAttachment = database.getAttachment(AttachmentId(attachmentId, 0)) ?: return null
val mediaConstraints = MediaConstraints.getPushMediaConstraints()
val scaledAttachment = scaleAndStripExif(database, mediaConstraints, databaseAttachment) ?: return null
return getAttachmentFor(scaledAttachment)
}
override fun getSignalAttachmentPointer(attachmentId: Long): SignalServiceAttachmentPointer? {
val attachmentDatabase = DatabaseFactory.getAttachmentDatabase(context)
val databaseAttachment = attachmentDatabase.getAttachment(AttachmentId(attachmentId, 0)) ?: return null
@ -84,12 +97,28 @@ class DatabaseAttachmentProvider(context: Context, helper: SQLCipherOpenHelper)
return smsDatabase.isOutgoingMessage(timestamp) || mmsDatabase.isOutgoingMessage(timestamp)
}
override fun updateAttachmentAfterUploadSucceeded(attachmentId: Long, uploadResult: DotNetAPI.UploadResult) {
TODO("Not yet implemented")
override fun updateAttachmentAfterUploadSucceeded(attachmentId: Long, attachmentStream: SignalServiceAttachmentStream, attachmentKey: ByteArray, uploadResult: DotNetAPI.UploadResult) {
val database = DatabaseFactory.getAttachmentDatabase(context)
val databaseAttachment = getDatabaseAttachment(attachmentId) ?: return
val attachmentPointer = SignalServiceAttachmentPointer(uploadResult.id,
attachmentStream.contentType,
attachmentKey,
Optional.of(Util.toIntExact(attachmentStream.length)),
attachmentStream.preview,
attachmentStream.width, attachmentStream.height,
Optional.fromNullable(uploadResult.digest),
attachmentStream.fileName,
attachmentStream.voiceNote,
attachmentStream.caption,
uploadResult.url);
val attachment = PointerAttachment.forPointer(Optional.of(attachmentPointer), databaseAttachment.fastPreflightId).get()
database.updateAttachmentAfterUploadSucceeded(databaseAttachment.attachmentId, attachment)
}
override fun updateAttachmentAfterUploadFailed(attachmentId: Long) {
TODO("Not yet implemented")
val database = DatabaseFactory.getAttachmentDatabase(context)
val databaseAttachment = getDatabaseAttachment(attachmentId) ?: return
database.updateAttachmentAfterUploadFailed(databaseAttachment.attachmentId)
}
override fun getMessageID(serverID: Long): Long? {
@ -107,6 +136,47 @@ class DatabaseAttachmentProvider(context: Context, helper: SQLCipherOpenHelper)
return attachmentDatabase.getAttachment(AttachmentId(attachmentId, 0))
}
private fun scaleAndStripExif(attachmentDatabase: AttachmentDatabase, constraints: MediaConstraints, attachment: Attachment): Attachment? {
return try {
if (constraints.isSatisfied(context, attachment)) {
if (MediaUtil.isJpeg(attachment)) {
val stripped = constraints.getResizedMedia(context, attachment)
attachmentDatabase.updateAttachmentData(attachment, stripped)
} else {
attachment
}
} else if (constraints.canResize(attachment)) {
val resized = constraints.getResizedMedia(context, attachment)
attachmentDatabase.updateAttachmentData(attachment, resized)
} else {
throw UndeliverableMessageException("Size constraints could not be met!")
}
} catch (e: Exception) {
return null
}
}
private fun getAttachmentFor(attachment: Attachment): SignalServiceAttachmentStream? {
try {
if (attachment.dataUri == null || attachment.size == 0L) throw IOException("Assertion failed, outgoing attachment has no data!")
val `is` = PartAuthority.getAttachmentStream(context, attachment.dataUri!!)
return SignalServiceAttachment.newStreamBuilder()
.withStream(`is`)
.withContentType(attachment.contentType)
.withLength(attachment.size)
.withFileName(attachment.fileName)
.withVoiceNote(attachment.isVoiceNote)
.withWidth(attachment.width)
.withHeight(attachment.height)
.withCaption(attachment.caption)
.withListener { total: Long, progress: Long -> EventBus.getDefault().postSticky(PartProgressEvent(attachment, total, progress)) }
.build()
} catch (ioe: IOException) {
Log.w("Loki", "Couldn't open attachment", ioe)
}
return null
}
}
fun DatabaseAttachment.toAttachmentPointer(): SessionServiceAttachmentPointer {

View File

@ -22,7 +22,7 @@ class SessionJobDatabase(context: Context, helper: SQLCipherOpenHelper) : Databa
fun persistJob(job: Job) {
val database = databaseHelper.writableDatabase
val contentValues = ContentValues()
val contentValues = ContentValues(4)
contentValues.put(jobID, job.id)
contentValues.put(jobType, job.getFactoryKey())
contentValues.put(failureCount, job.failureCount)

View File

@ -18,6 +18,7 @@ interface MessageDataProvider {
fun getAttachmentPointer(attachmentId: Long): SessionServiceAttachmentPointer?
fun getSignalAttachmentStream(attachmentId: Long): SignalServiceAttachmentStream?
fun getScaledSignalAttachmentStream(attachmentId: Long): SignalServiceAttachmentStream?
fun getSignalAttachmentPointer(attachmentId: Long): SignalServiceAttachmentPointer?
fun setAttachmentState(attachmentState: AttachmentState, attachmentId: Long, messageID: Long)
@ -26,7 +27,7 @@ interface MessageDataProvider {
fun isOutgoingMessage(timestamp: Long): Boolean
fun updateAttachmentAfterUploadSucceeded(attachmentId: Long, uploadResult: DotNetAPI.UploadResult)
fun updateAttachmentAfterUploadSucceeded(attachmentId: Long, attachmentStream: SignalServiceAttachmentStream, attachmentKey: ByteArray, uploadResult: DotNetAPI.UploadResult)
fun updateAttachmentAfterUploadFailed(attachmentId: Long)
// Quotes

View File

@ -8,11 +8,17 @@ import org.session.libsession.messaging.fileserver.FileServerAPI
import org.session.libsession.messaging.messages.Message
import org.session.libsession.messaging.sending_receiving.MessageSender
import org.session.libsession.messaging.utilities.DotNetAPI
import org.session.libsignal.utilities.logging.Log
import org.session.libsignal.service.api.crypto.AttachmentCipherOutputStream
import org.session.libsignal.service.api.messages.SignalServiceAttachmentStream
import org.session.libsignal.service.internal.crypto.PaddingInputStream
import org.session.libsignal.service.internal.push.PushAttachmentData
import org.session.libsignal.service.internal.push.http.AttachmentCipherOutputStreamFactory
import org.session.libsignal.service.internal.util.Util
import org.session.libsignal.service.loki.api.opengroups.PublicChat
import org.session.libsignal.service.loki.utilities.PlaintextOutputStreamFactory
import org.session.libsignal.utilities.ThreadUtils
import org.session.libsignal.utilities.logging.Log
import java.io.InputStream
class AttachmentUploadJob(val attachmentID: Long, val threadID: String, val message: Message, val messageSendJobID: String) : Job {
@ -41,47 +47,51 @@ class AttachmentUploadJob(val attachmentID: Long, val threadID: String, val mess
}
override fun execute() {
try {
val attachmentStream = MessagingConfiguration.shared.messageDataProvider.getAttachmentStream(attachmentID)
?: return handleFailure(Error.NoAttachment)
ThreadUtils.queue {
try {
val attachment = MessagingConfiguration.shared.messageDataProvider.getScaledSignalAttachmentStream(attachmentID)
?: return@queue handleFailure(Error.NoAttachment)
val openGroup = MessagingConfiguration.shared.storage.getOpenGroup(threadID)
val server = openGroup?.server ?: FileServerAPI.server
var server = FileServerAPI.server
var shouldEncrypt = true
val openGroup = MessagingConfiguration.shared.storage.getOpenGroup(threadID)
openGroup?.let {
server = it.server
shouldEncrypt = false
}
//TODO add some encryption stuff here
val isEncryptionRequired = false
//val isEncryptionRequired = (server == FileServerAPI.server)
val attachmentKey = Util.getSecretBytes(64)
val ciphertextLength = if (shouldEncrypt) AttachmentCipherOutputStream.getCiphertextLength(attachment.length) else attachment.length
val attachmentKey = Util.getSecretBytes(64)
val outputStreamFactory = if (isEncryptionRequired) AttachmentCipherOutputStreamFactory(attachmentKey) else PlaintextOutputStreamFactory()
val ciphertextLength = attachmentStream.length
val outputStreamFactory = if (shouldEncrypt) AttachmentCipherOutputStreamFactory(attachmentKey) else PlaintextOutputStreamFactory()
val attachmentData = PushAttachmentData(attachment.contentType, attachment.inputStream, ciphertextLength, outputStreamFactory, attachment.listener)
val attachmentData = PushAttachmentData(attachmentStream.contentType, attachmentStream.inputStream, ciphertextLength, outputStreamFactory, attachmentStream.listener)
val uploadResult = FileServerAPI.shared.uploadAttachment(server, attachmentData)
handleSuccess(attachment, attachmentKey, uploadResult)
val uploadResult = FileServerAPI.shared.uploadAttachment(server, attachmentData)
handleSuccess(uploadResult)
} catch (e: java.lang.Exception) {
if (e is Error && e == Error.NoAttachment) {
this.handlePermanentFailure(e)
} else if (e is DotNetAPI.Error && !e.isRetryable) {
this.handlePermanentFailure(e)
} else {
this.handleFailure(e)
} catch (e: java.lang.Exception) {
if (e is Error && e == Error.NoAttachment) {
this.handlePermanentFailure(e)
} else if (e is DotNetAPI.Error && !e.isRetryable) {
this.handlePermanentFailure(e)
} else {
this.handleFailure(e)
}
}
}
}
private fun handleSuccess(uploadResult: DotNetAPI.UploadResult) {
private fun handleSuccess(attachment: SignalServiceAttachmentStream, attachmentKey: ByteArray, uploadResult: DotNetAPI.UploadResult) {
Log.w(TAG, "Attachment uploaded successfully.")
delegate?.handleJobSucceeded(this)
//TODO: handle success in database
MessagingConfiguration.shared.messageDataProvider.updateAttachmentAfterUploadSucceeded(attachmentID, attachment, attachmentKey, uploadResult)
MessagingConfiguration.shared.storage.resumeMessageSendJobIfNeeded(messageSendJobID)
}
private fun handlePermanentFailure(e: Exception) {
Log.w(TAG, "Attachment upload failed permanently due to error: $this.")
delegate?.handleJobFailedPermanently(this, e)
MessagingConfiguration.shared.messageDataProvider.updateAttachmentAfterUploadFailed(attachmentID)
failAssociatedMessageSendJob(e)
}
@ -120,7 +130,7 @@ class AttachmentUploadJob(val attachmentID: Long, val threadID: String, val mess
}
override fun getFactoryKey(): String {
return AttachmentDownloadJob.KEY
return KEY
}
class Factory: Job.Factory<AttachmentUploadJob> {

View File

@ -75,7 +75,7 @@ class MessageReceiveJob(val data: ByteArray, val isBackgroundPoll: Boolean, val
}
override fun getFactoryKey(): String {
return AttachmentDownloadJob.KEY
return KEY
}
class Factory: Job.Factory<MessageReceiveJob> {

View File

@ -32,7 +32,7 @@ class MessageSendJob(val message: Message, val destination: Destination) : Job {
val message = message as? VisibleMessage
message?.let {
if(!messageDataProvider.isOutgoingMessage(message.sentTimestamp!!)) return // The message has been deleted
val attachments = message.attachmentIDs.map { messageDataProvider.getDatabaseAttachment(it) }.filterNotNull()
val attachments = message.attachmentIDs.mapNotNull { messageDataProvider.getDatabaseAttachment(it) }
val attachmentsToUpload = attachments.filter { it.url.isNullOrEmpty() }
attachmentsToUpload.forEach {
if (MessagingConfiguration.shared.storage.getAttachmentUploadJob(it.attachmentId.rowId) != null) {
@ -82,10 +82,10 @@ class MessageSendJob(val message: Message, val destination: Destination) : Job {
val serializedMessage = ByteArray(4096)
val serializedDestination = ByteArray(4096)
var output = Output(serializedMessage)
kryo.writeObject(output, message)
kryo.writeClassAndObject(output, message)
output.close()
output = Output(serializedDestination)
kryo.writeObject(output, destination)
kryo.writeClassAndObject(output, destination)
output.close()
return Data.Builder().putByteArray(KEY_MESSAGE, serializedMessage)
.putByteArray(KEY_DESTINATION, serializedDestination)
@ -93,7 +93,7 @@ class MessageSendJob(val message: Message, val destination: Destination) : Job {
}
override fun getFactoryKey(): String {
return AttachmentDownloadJob.KEY
return KEY
}
class Factory: Job.Factory<MessageSendJob> {
@ -103,10 +103,10 @@ class MessageSendJob(val message: Message, val destination: Destination) : Job {
//deserialize Message and Destination properties
val kryo = Kryo()
var input = Input(serializedMessage)
val message: Message = kryo.readObject(input, Message::class.java)
val message = kryo.readClassAndObject(input) as Message
input.close()
input = Input(serializedDestination)
val destination: Destination = kryo.readObject(input, Destination::class.java)
val destination = kryo.readClassAndObject(input) as Destination
input.close()
return MessageSendJob(message, destination)
}

View File

@ -76,7 +76,7 @@ class NotifyPNServerJob(val message: SnodeMessage) : Job {
}
override fun getFactoryKey(): String {
return AttachmentDownloadJob.KEY
return KEY
}
class Factory: Job.Factory<NotifyPNServerJob> {

View File

@ -7,24 +7,46 @@ import org.session.libsignal.service.loki.utilities.toHexString
sealed class Destination {
class Contact(val publicKey: String) : Destination()
class ClosedGroup(val groupPublicKey: String) : Destination()
class OpenGroup(val channel: Long, val server: String) : Destination()
class Contact() : Destination() {
var publicKey: String = ""
internal constructor(publicKey: String): this() {
this.publicKey = publicKey
}
}
class ClosedGroup() : Destination() {
var groupPublicKey: String = ""
internal constructor(groupPublicKey: String): this() {
this.groupPublicKey = groupPublicKey
}
}
class OpenGroup() : Destination() {
var channel: Long = 0
var server: String = ""
internal constructor(channel: Long, server: String): this() {
this.channel = channel
this.server = server
}
}
companion object {
fun from(address: Address): Destination {
if (address.isContact) {
return Contact(address.contactIdentifier())
} else if (address.isClosedGroup) {
val groupID = address.toGroupString()
val groupPublicKey = GroupUtil.doubleDecodeGroupID(groupID).toHexString()
return ClosedGroup(groupPublicKey)
} else if (address.isOpenGroup) {
val threadID = MessagingConfiguration.shared.storage.getThreadID(address.contactIdentifier())!!
val openGroup = MessagingConfiguration.shared.storage.getOpenGroup(threadID)!!
return OpenGroup(openGroup.channel, openGroup.server)
} else {
throw Exception("TODO: Handle legacy closed groups.")
return when {
address.isContact -> {
Contact(address.contactIdentifier())
}
address.isClosedGroup -> {
val groupID = address.toGroupString()
val groupPublicKey = GroupUtil.doubleDecodeGroupID(groupID).toHexString()
ClosedGroup(groupPublicKey)
}
address.isOpenGroup -> {
val threadID = MessagingConfiguration.shared.storage.getThreadID(address.contactIdentifier())!!
val openGroup = MessagingConfiguration.shared.storage.getOpenGroup(threadID)!!
OpenGroup(openGroup.channel, openGroup.server)
}
else -> {
throw Exception("TODO: Handle legacy closed groups.")
}
}
}
}

View File

@ -1,14 +1,15 @@
package org.session.libsession.messaging.messages.visible
import com.google.protobuf.ByteString
import com.goterl.lazycode.lazysodium.BuildConfig
import org.session.libsession.messaging.MessagingConfiguration
import org.session.libsession.messaging.messages.Message
import org.session.libsession.messaging.sending_receiving.attachments.DatabaseAttachment
import org.session.libsession.messaging.sending_receiving.attachments.Attachment as SignalAttachment
import org.session.libsignal.utilities.logging.Log
import org.session.libsignal.service.api.messages.SignalServiceAttachmentPointer
import org.session.libsignal.service.internal.push.SignalServiceProtos
import org.session.libsignal.service.internal.push.SignalServiceProtos.AttachmentPointer
import org.session.libsignal.utilities.logging.Log
import org.session.libsession.messaging.sending_receiving.attachments.Attachment as SignalAttachment
class VisibleMessage : Message() {
@ -108,15 +109,15 @@ class VisibleMessage : Message() {
}
}
//Attachments
val attachments = attachmentIDs.mapNotNull { MessagingConfiguration.shared.messageDataProvider.getAttachmentStream(it) }
if (!attachments.all { it.isUploaded }) {
val attachments = attachmentIDs.mapNotNull { MessagingConfiguration.shared.messageDataProvider.getSignalAttachmentPointer(it) }
if (!attachments.all { !it.url.isNullOrEmpty() }) {
if (BuildConfig.DEBUG) {
//TODO equivalent to iOS's preconditionFailure
Log.d(TAG,"Sending a message before all associated attachments have been uploaded.")
Log.d(TAG, "Sending a message before all associated attachments have been uploaded.")
}
}
val attachmentProtos = attachments.mapNotNull { it.toProto() }
dataMessage.addAllAttachments(attachmentProtos)
val attachmentPointers = attachments.mapNotNull { createAttachmentPointer(it) }
dataMessage.addAllAttachments(attachmentPointers)
// Sync target
if (syncTarget != null) {
dataMessage.syncTarget = syncTarget
@ -131,4 +132,33 @@ class VisibleMessage : Message() {
return null
}
}
private fun createAttachmentPointer(attachment: SignalServiceAttachmentPointer): AttachmentPointer? {
val builder = AttachmentPointer.newBuilder()
.setContentType(attachment.contentType)
.setId(attachment.id)
.setKey(ByteString.copyFrom(attachment.key))
.setDigest(ByteString.copyFrom(attachment.digest.get()))
.setSize(attachment.size.get())
.setUrl(attachment.url)
if (attachment.fileName.isPresent) {
builder.fileName = attachment.fileName.get()
}
if (attachment.preview.isPresent) {
builder.thumbnail = ByteString.copyFrom(attachment.preview.get())
}
if (attachment.width > 0) {
builder.width = attachment.width
}
if (attachment.height > 0) {
builder.height = attachment.height
}
if (attachment.voiceNote) {
builder.flags = AttachmentPointer.Flags.VOICE_MESSAGE_VALUE
}
if (attachment.caption.isPresent) {
builder.caption = attachment.caption.get()
}
return builder.build()
}
}