This commit is contained in:
Ryan ZHAO 2021-01-08 11:13:12 +11:00
commit 0820e577e8
7 changed files with 174 additions and 27 deletions

View File

@ -2,17 +2,21 @@ package org.thoughtcrime.securesms.attachments
import android.content.Context
import com.google.protobuf.ByteString
import org.greenrobot.eventbus.EventBus
import org.session.libsession.database.MessageDataProvider
import org.session.libsession.messaging.sending_receiving.attachments.AttachmentState
import org.session.libsession.messaging.sending_receiving.attachments.SessionServiceAttachmentPointer
import org.session.libsession.messaging.sending_receiving.attachments.SessionServiceAttachmentStream
import org.session.libsignal.libsignal.util.guava.Optional
import org.session.libsignal.service.api.messages.SignalServiceAttachment
import org.thoughtcrime.securesms.database.Database
import org.thoughtcrime.securesms.database.DatabaseFactory
import org.thoughtcrime.securesms.database.helpers.SQLCipherOpenHelper
import org.thoughtcrime.securesms.events.PartProgressEvent
import org.thoughtcrime.securesms.jobs.AttachmentUploadJob
import org.thoughtcrime.securesms.mms.PartAuthority
import org.thoughtcrime.securesms.util.MediaUtil
import java.io.InputStream
class DatabaseAttachmentProvider(context: Context, helper: SQLCipherOpenHelper) : Database(context, helper), MessageDataProvider {
@ -39,6 +43,11 @@ class DatabaseAttachmentProvider(context: Context, helper: SQLCipherOpenHelper)
attachmentUploadJob.onRun()
}
override fun insertAttachment(messageId: Long, attachmentId: Long, stream : InputStream) {
val attachmentDatabase = DatabaseFactory.getAttachmentDatabase(context)
attachmentDatabase.insertAttachmentsForPlaceholder(messageId, AttachmentId(attachmentId,0), stream)
}
override fun isOutgoingMessage(timestamp: Long): Boolean {
val smsDatabase = DatabaseFactory.getSmsDatabase(context)
return smsDatabase.isOutgoingMessage(timestamp)
@ -52,7 +61,9 @@ fun DatabaseAttachment.toAttachmentPointer(): SessionServiceAttachmentPointer {
fun DatabaseAttachment.toAttachmentStream(context: Context): SessionServiceAttachmentStream {
val stream = PartAuthority.getAttachmentStream(context, this.dataUri!!)
var attachmentStream = SessionServiceAttachmentStream(stream, this.contentType, this.size, Optional.fromNullable(this.fileName), this.isVoiceNote, Optional.absent(), this.width, this.height, Optional.fromNullable(this.caption), null)
val listener = SignalServiceAttachment.ProgressListener { total: Long, progress: Long -> EventBus.getDefault().postSticky(PartProgressEvent(this, total, progress))}
var attachmentStream = SessionServiceAttachmentStream(stream, this.contentType, this.size, Optional.fromNullable(this.fileName), this.isVoiceNote, Optional.absent(), this.width, this.height, Optional.fromNullable(this.caption), listener)
attachmentStream.attachmentId = this.attachmentId.rowId
attachmentStream.isAudio = MediaUtil.isAudio(this)
attachmentStream.isGif = MediaUtil.isGif(this)
@ -60,8 +71,7 @@ fun DatabaseAttachment.toAttachmentStream(context: Context): SessionServiceAttac
attachmentStream.isImage = MediaUtil.isImage(this)
attachmentStream.key = ByteString.copyFrom(this.key?.toByteArray())
attachmentStream.digest = this.digest
//attachmentStream.flags = if (this.isVoiceNote) SignalServiceProtos.AttachmentPointer.Flags.VOICE_MESSAGE.number else 0
attachmentStream.digest = Optional.fromNullable(this.digest)
attachmentStream.url = this.url

View File

@ -73,4 +73,6 @@ dependencies {
testImplementation "org.assertj:assertj-core:1.7.1"
testImplementation "org.conscrypt:conscrypt-openjdk-uber:2.0.0"
implementation 'org.greenrobot:eventbus:3.0.0'
}

View File

@ -4,6 +4,7 @@ import org.session.libsession.messaging.sending_receiving.attachments.Attachment
import org.session.libsession.messaging.sending_receiving.attachments.SessionServiceAttachmentPointer
import org.session.libsession.messaging.sending_receiving.attachments.SessionServiceAttachmentStream
import org.session.libsignal.service.api.messages.SignalServiceAttachmentPointer
import java.io.InputStream
interface MessageDataProvider {
@ -20,6 +21,8 @@ interface MessageDataProvider {
fun setAttachmentState(attachmentState: AttachmentState, attachmentId: Long, messageID: Long)
fun insertAttachment(messageId: Long, attachmentId: Long, stream : InputStream)
fun isOutgoingMessage(timestamp: Long): Boolean
@Throws(Exception::class)

View File

@ -1,17 +1,84 @@
package org.session.libsession.messaging.jobs
class AttachmentDownloadJob: Job {
import org.session.libsession.messaging.MessagingConfiguration
import org.session.libsession.messaging.fileserver.FileServerAPI
import org.session.libsession.messaging.sending_receiving.attachments.AttachmentState
import org.session.libsession.messaging.utilities.DotNetAPI
import org.session.libsignal.service.api.crypto.AttachmentCipherInputStream
import java.io.File
import java.io.FileInputStream
class AttachmentDownloadJob(val attachmentID: Long, val tsIncomingMessageID: Long): Job {
override var delegate: JobDelegate? = null
override var id: String? = null
override var failureCount: Int = 0
private val MAX_ATTACHMENT_SIZE = 10 * 1024 * 1024
// Error
internal sealed class Error(val description: String) : Exception() {
object NoAttachment : Error("No such attachment.")
}
// Settings
override val maxFailureCount: Int = 100
override val maxFailureCount: Int = 20
companion object {
val collection: String = "AttachmentDownloadJobCollection"
}
override fun execute() {
TODO("Not yet implemented")
val messageDataProvider = MessagingConfiguration.shared.messageDataProvider
val attachmentStream = messageDataProvider.getAttachmentStream(attachmentID) ?: return handleFailure(Error.NoAttachment)
messageDataProvider.setAttachmentState(AttachmentState.STARTED, attachmentID, this.tsIncomingMessageID)
val tempFile = createTempFile()
val handleFailure: (java.lang.Exception) -> Unit = { exception ->
tempFile.delete()
if(exception is Error && exception == Error.NoAttachment) {
MessagingConfiguration.shared.messageDataProvider.setAttachmentState(AttachmentState.FAILED, attachmentID, tsIncomingMessageID)
this.handlePermanentFailure(exception)
} else if (exception is DotNetAPI.Error && exception == DotNetAPI.Error.ParsingFailed) {
// No need to retry if the response is invalid. Most likely this means we (incorrectly)
// got a "Cannot GET ..." error from the file server.
MessagingConfiguration.shared.messageDataProvider.setAttachmentState(AttachmentState.FAILED, attachmentID, tsIncomingMessageID)
this.handlePermanentFailure(exception)
} else {
this.handleFailure(exception)
}
}
try {
FileServerAPI.shared.downloadFile(tempFile, attachmentStream.url, MAX_ATTACHMENT_SIZE, attachmentStream.listener)
} catch (e: Exception) {
return handleFailure(e)
}
// DECRYPTION
// Assume we're retrieving an attachment for an open group server if the digest is not set
var stream = if (!attachmentStream.digest.isPresent || attachmentStream.key == null) FileInputStream(tempFile)
else AttachmentCipherInputStream.createForAttachment(tempFile, attachmentStream.length.or(0).toLong(), attachmentStream.key?.toByteArray(), attachmentStream?.digest.get())
messageDataProvider.insertAttachment(tsIncomingMessageID, attachmentID, stream)
tempFile.delete()
}
private fun handleSuccess() {
delegate?.handleJobSucceeded(this)
}
private fun handlePermanentFailure(e: Exception) {
delegate?.handleJobFailedPermanently(this, e)
}
private fun handleFailure(e: Exception) {
delegate?.handleJobFailed(this, e)
}
private fun createTempFile(): File {
val file = File.createTempFile("push-attachment", "tmp", MessagingConfiguration.shared.context.cacheDir)
file.deleteOnExit()
return file
}
}

View File

@ -92,20 +92,6 @@ abstract class SessionServiceAttachment protected constructor(val contentType: S
}
}
/**
* An interface to receive progress information on upload/download of
* an attachment.
*/
/*interface ProgressListener {
/**
* Called on a progress change event.
*
* @param total The total amount to transmit/receive in bytes.
* @param progress The amount that has been transmitted/received in bytes thus far
*/
fun onAttachmentProgress(total: Long, progress: Long)
}*/
companion object {
@JvmStatic
fun newStreamBuilder(): Builder {

View File

@ -22,7 +22,7 @@ class SessionServiceAttachmentStream(val inputStream: InputStream?, contentType:
// Though now required, `digest` may be null for pre-existing records or from
// messages received from other clients
var digest: ByteArray? = null
var digest: Optional<ByteArray> = Optional.absent()
// This only applies for attachments being uploaded.
var isUploaded: Boolean = false
@ -48,7 +48,7 @@ class SessionServiceAttachmentStream(val inputStream: InputStream?, contentType:
builder.size = this.length.toInt()
builder.key = this.key
builder.digest = ByteString.copyFrom(this.digest)
builder.digest = ByteString.copyFrom(this.digest.get())
builder.flags = if (this.voiceNote) SignalServiceProtos.AttachmentPointer.Flags.VOICE_MESSAGE.number else 0
//TODO I did copy the behavior of iOS below, not sure if that's relevant here...

View File

@ -4,19 +4,19 @@ import nl.komponents.kovenant.Promise
import nl.komponents.kovenant.functional.bind
import nl.komponents.kovenant.functional.map
import nl.komponents.kovenant.then
import okhttp3.MediaType
import okhttp3.MultipartBody
import okhttp3.Request
import okhttp3.RequestBody
import okhttp3.*
import org.session.libsession.messaging.MessagingConfiguration
import org.session.libsession.snode.OnionRequestAPI
import org.session.libsession.snode.SnodeAPI
import org.session.libsession.messaging.fileserver.FileServerAPI
import org.session.libsession.messaging.sending_receiving.MessageReceiver
import org.session.libsession.messaging.sending_receiving.attachments.SessionServiceAttachment
import org.session.libsignal.libsignal.logging.Log
import org.session.libsignal.libsignal.loki.DiffieHellman
import org.session.libsignal.service.api.crypto.ProfileCipherOutputStream
import org.session.libsignal.service.api.messages.SignalServiceAttachment
import org.session.libsignal.service.api.push.exceptions.NonSuccessfulResponseCodeException
import org.session.libsignal.service.api.push.exceptions.PushNetworkException
import org.session.libsignal.service.api.util.StreamDetails
@ -29,6 +29,9 @@ import org.session.libsignal.service.internal.util.Hex
import org.session.libsignal.service.internal.util.JsonUtil
import org.session.libsignal.service.loki.api.utilities.HTTP
import org.session.libsignal.service.loki.utilities.*
import java.io.File
import java.io.FileOutputStream
import java.io.OutputStream
import java.util.*
/**
@ -48,6 +51,8 @@ open class DotNetAPI {
object DecryptionFailed : Error("Couldn't decrypt file.")
object MaxFileSizeExceeded : Error("Maximum file size exceeded.")
object TokenExpired: Error("Token expired.") // Session Android
internal val isRetryable: Boolean = false
}
companion object {
@ -56,7 +61,7 @@ open class DotNetAPI {
public data class UploadResult(val id: Long, val url: String, val digest: ByteArray?)
public fun getAuthToken(server: String): Promise<String, Exception> {
fun getAuthToken(server: String): Promise<String, Exception> {
val storage = MessagingConfiguration.shared.storage
val token = storage.getAuthToken(server)
if (token != null) { return Promise.of(token) }
@ -175,6 +180,80 @@ open class DotNetAPI {
return execute(HTTPVerb.PATCH, server, "users/me", parameters = parameters)
}
// DOWNLOAD
/**
* Blocks the calling thread.
*/
fun downloadFile(destination: File, url: String, maxSize: Int, listener: SignalServiceAttachment.ProgressListener?) {
val outputStream = FileOutputStream(destination) // Throws
var remainingAttempts = 4
var exception: Exception? = null
while (remainingAttempts > 0) {
remainingAttempts -= 1
try {
downloadFile(outputStream, url, maxSize, listener)
exception = null
break
} catch (e: Exception) {
exception = e
}
}
if (exception != null) { throw exception }
}
/**
* Blocks the calling thread.
*/
fun downloadFile(outputStream: OutputStream, url: String, maxSize: Int, listener: SignalServiceAttachment.ProgressListener?) {
// We need to throw a PushNetworkException or NonSuccessfulResponseCodeException
// because the underlying Signal logic requires these to work correctly
val oldPrefixedHost = "https://" + HttpUrl.get(url).host()
var newPrefixedHost = oldPrefixedHost
if (oldPrefixedHost.contains(FileServerAPI.fileStorageBucketURL)) {
newPrefixedHost = FileServerAPI.shared.server
}
// Edge case that needs to work: https://file-static.lokinet.org/i1pNmpInq3w9gF3TP8TFCa1rSo38J6UM
// → https://file.getsession.org/loki/v1/f/XLxogNXVEIWHk14NVCDeppzTujPHxu35
val fileID = url.substringAfter(oldPrefixedHost).substringAfter("/f/")
val sanitizedURL = "$newPrefixedHost/loki/v1/f/$fileID"
val request = Request.Builder().url(sanitizedURL).get()
try {
val serverPublicKey = if (newPrefixedHost.contains(FileServerAPI.shared.server)) FileServerAPI.fileServerPublicKey
else FileServerAPI.shared.getPublicKeyForOpenGroupServer(newPrefixedHost).get()
val json = OnionRequestAPI.sendOnionRequest(request.build(), newPrefixedHost, serverPublicKey, isJSONRequired = false).get()
val result = json["result"] as? String
if (result == null) {
Log.d("Loki", "Couldn't parse attachment from: $json.")
throw PushNetworkException("Missing response body.")
}
val body = Base64.decode(result)
if (body.size > maxSize) {
Log.d("Loki", "Attachment size limit exceeded.")
throw PushNetworkException("Max response size exceeded.")
}
val input = body.inputStream()
val buffer = ByteArray(32768)
var count = 0
var bytes = input.read(buffer)
while (bytes >= 0) {
outputStream.write(buffer, 0, bytes)
count += bytes
if (count > maxSize) {
Log.d("Loki", "Attachment size limit exceeded.")
throw PushNetworkException("Max response size exceeded.")
}
listener?.onAttachmentProgress(body.size.toLong(), count.toLong())
bytes = input.read(buffer)
}
} catch (e: Exception) {
Log.d("Loki", "Couldn't download attachment due to error: $e.")
throw if (e is NonSuccessfulResponseCodeException) e else PushNetworkException(e)
}
}
// UPLOAD
@Throws(PushNetworkException::class, NonSuccessfulResponseCodeException::class)
fun uploadAttachment(server: String, attachment: PushAttachmentData): UploadResult {
// This function mimics what Signal does in PushServiceSocket