mirror of
https://github.com/oxen-io/session-android.git
synced 2025-08-11 19:07:40 +00:00
Merge pull request #647 from hjubb/trusted_attachment_download
Only Download Attachments from Trusted Contacts
This commit is contained in:
@@ -3,6 +3,7 @@ package org.session.libsession.database
|
||||
import org.session.libsession.messaging.sending_receiving.attachments.*
|
||||
import org.session.libsession.utilities.Address
|
||||
import org.session.libsession.utilities.UploadResult
|
||||
import org.session.libsession.utilities.recipients.Recipient
|
||||
import org.session.libsignal.messages.SignalServiceAttachmentPointer
|
||||
import org.session.libsignal.messages.SignalServiceAttachmentStream
|
||||
import java.io.InputStream
|
||||
@@ -18,10 +19,11 @@ interface MessageDataProvider {
|
||||
fun getSignalAttachmentStream(attachmentId: Long): SignalServiceAttachmentStream?
|
||||
fun getScaledSignalAttachmentStream(attachmentId: Long): SignalServiceAttachmentStream?
|
||||
fun getSignalAttachmentPointer(attachmentId: Long): SignalServiceAttachmentPointer?
|
||||
fun setAttachmentState(attachmentState: AttachmentState, attachmentId: Long, messageID: Long)
|
||||
fun setAttachmentState(attachmentState: AttachmentState, attachmentId: AttachmentId, messageID: Long)
|
||||
fun insertAttachment(messageId: Long, attachmentId: AttachmentId, stream : InputStream)
|
||||
fun updateAudioAttachmentDuration(attachmentId: AttachmentId, durationMs: Long, threadId: Long)
|
||||
fun isOutgoingMessage(timestamp: Long): Boolean
|
||||
fun isMmsOutgoing(mmsMessageId: Long): Boolean
|
||||
fun isOutgoingMessage(mmsId: Long): Boolean
|
||||
fun handleSuccessfulAttachmentUpload(attachmentId: Long, attachmentStream: SignalServiceAttachmentStream, attachmentKey: ByteArray, uploadResult: UploadResult)
|
||||
fun handleFailedAttachmentUpload(attachmentId: Long)
|
||||
fun getMessageForQuote(timestamp: Long, author: Address): Pair<Long, Boolean>?
|
||||
@@ -29,4 +31,5 @@ interface MessageDataProvider {
|
||||
fun getMessageBodyFor(timestamp: Long, author: String): String
|
||||
fun getAttachmentIDsFor(messageID: Long): List<Long>
|
||||
fun getLinkPreviewAttachmentIDFor(messageID: Long): Long?
|
||||
fun getIndividualRecipientForMms(mmsId: Long): Recipient?
|
||||
}
|
@@ -139,6 +139,7 @@ interface StorageProtocol {
|
||||
fun getContactWithSessionID(sessionID: String): Contact?
|
||||
fun getAllContacts(): Set<Contact>
|
||||
fun setContact(contact: Contact)
|
||||
fun getRecipientForThread(threadId: Long): Recipient?
|
||||
fun getRecipientSettings(address: Address): RecipientSettings?
|
||||
fun addContacts(contacts: List<ConfigurationMessage.Contact>)
|
||||
|
||||
|
@@ -3,6 +3,7 @@ package org.session.libsession.messaging.jobs
|
||||
import okhttp3.HttpUrl
|
||||
import org.session.libsession.messaging.MessagingModuleConfiguration
|
||||
import org.session.libsession.messaging.open_groups.OpenGroupAPIV2
|
||||
import org.session.libsession.messaging.sending_receiving.attachments.AttachmentId
|
||||
import org.session.libsession.messaging.sending_receiving.attachments.AttachmentState
|
||||
import org.session.libsession.messaging.sending_receiving.attachments.DatabaseAttachment
|
||||
import org.session.libsession.messaging.utilities.Data
|
||||
@@ -16,6 +17,7 @@ import org.session.libsignal.utilities.Log
|
||||
import java.io.File
|
||||
import java.io.FileInputStream
|
||||
import java.io.InputStream
|
||||
import java.lang.NullPointerException
|
||||
|
||||
class AttachmentDownloadJob(val attachmentID: Long, val databaseMessageID: Long) : Job {
|
||||
override var delegate: JobDelegate? = null
|
||||
@@ -25,6 +27,9 @@ class AttachmentDownloadJob(val attachmentID: Long, val databaseMessageID: Long)
|
||||
// Error
|
||||
internal sealed class Error(val description: String) : Exception(description) {
|
||||
object NoAttachment : Error("No such attachment.")
|
||||
object NoThread: Error("Thread no longer exists")
|
||||
object NoSender: Error("Thread recipient or sender does not exist")
|
||||
object DuplicateData: Error("Attachment already downloaded")
|
||||
}
|
||||
|
||||
// Settings
|
||||
@@ -41,22 +46,56 @@ class AttachmentDownloadJob(val attachmentID: Long, val databaseMessageID: Long)
|
||||
override fun execute() {
|
||||
val storage = MessagingModuleConfiguration.shared.storage
|
||||
val messageDataProvider = MessagingModuleConfiguration.shared.messageDataProvider
|
||||
val handleFailure: (java.lang.Exception) -> Unit = { exception ->
|
||||
val threadID = storage.getThreadIdForMms(databaseMessageID)
|
||||
|
||||
val handleFailure: (java.lang.Exception, attachmentId: AttachmentId?) -> Unit = { exception, attachment ->
|
||||
if (exception == Error.NoAttachment
|
||||
|| exception == Error.NoThread
|
||||
|| exception == Error.NoSender
|
||||
|| (exception is OnionRequestAPI.HTTPRequestFailedAtDestinationException && exception.statusCode == 400)) {
|
||||
messageDataProvider.setAttachmentState(AttachmentState.FAILED, attachmentID, databaseMessageID)
|
||||
attachment?.let { id ->
|
||||
messageDataProvider.setAttachmentState(AttachmentState.FAILED, id, databaseMessageID)
|
||||
} ?: run {
|
||||
messageDataProvider.setAttachmentState(AttachmentState.FAILED, AttachmentId(attachmentID,0), databaseMessageID)
|
||||
}
|
||||
this.handlePermanentFailure(exception)
|
||||
} else {
|
||||
this.handleFailure(exception)
|
||||
}
|
||||
}
|
||||
|
||||
if (threadID < 0) {
|
||||
handleFailure(Error.NoThread, null)
|
||||
return
|
||||
}
|
||||
|
||||
val threadRecipient = storage.getRecipientForThread(threadID)
|
||||
val sender = if (messageDataProvider.isMmsOutgoing(databaseMessageID)) {
|
||||
storage.getUserPublicKey()
|
||||
} else {
|
||||
messageDataProvider.getIndividualRecipientForMms(databaseMessageID)?.address?.serialize()
|
||||
}
|
||||
val contact = sender?.let { storage.getContactWithSessionID(it) }
|
||||
if (threadRecipient == null || sender == null || contact == null) {
|
||||
handleFailure(Error.NoSender, null)
|
||||
return
|
||||
}
|
||||
if (!threadRecipient.isGroupRecipient && (!contact.isTrusted && storage.getUserPublicKey() != sender)) {
|
||||
// if we aren't receiving a group message, a message from ourselves (self-send) and the contact sending is not trusted:
|
||||
// do not continue, but do not fail
|
||||
return
|
||||
}
|
||||
|
||||
var tempFile: File? = null
|
||||
try {
|
||||
val attachment = messageDataProvider.getDatabaseAttachment(attachmentID)
|
||||
?: return handleFailure(Error.NoAttachment)
|
||||
messageDataProvider.setAttachmentState(AttachmentState.STARTED, attachmentID, this.databaseMessageID)
|
||||
?: return handleFailure(Error.NoAttachment, null)
|
||||
if (attachment.hasData()) {
|
||||
handleFailure(Error.DuplicateData, attachment.attachmentId)
|
||||
return
|
||||
}
|
||||
messageDataProvider.setAttachmentState(AttachmentState.STARTED, attachment.attachmentId, this.databaseMessageID)
|
||||
tempFile = createTempFile()
|
||||
val threadID = storage.getThreadIdForMms(databaseMessageID)
|
||||
val openGroupV2 = storage.getV2OpenGroup(threadID)
|
||||
if (openGroupV2 == null) {
|
||||
DownloadUtilities.downloadFile(tempFile, attachment.url)
|
||||
@@ -89,7 +128,7 @@ class AttachmentDownloadJob(val attachmentID: Long, val databaseMessageID: Long)
|
||||
handleSuccess()
|
||||
} catch (e: Exception) {
|
||||
tempFile?.delete()
|
||||
return handleFailure(e)
|
||||
return handleFailure(e,null)
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -20,7 +20,7 @@ class JobQueue : JobDelegate {
|
||||
private val jobTimestampMap = ConcurrentHashMap<Long, AtomicInteger>()
|
||||
private val rxDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
|
||||
private val txDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
|
||||
private val attachmentDispatcher = Executors.newFixedThreadPool(2).asCoroutineDispatcher()
|
||||
private val attachmentDispatcher = Executors.newFixedThreadPool(4).asCoroutineDispatcher()
|
||||
private val scope = GlobalScope + SupervisorJob()
|
||||
private val queue = Channel<Job>(UNLIMITED)
|
||||
private val pendingJobIds = mutableSetOf<String>()
|
||||
@@ -100,6 +100,23 @@ class JobQueue : JobDelegate {
|
||||
Log.d("Loki", "resumed pending send message $id")
|
||||
}
|
||||
|
||||
fun resumePendingJobs(typeKey: String) {
|
||||
val allPendingJobs = MessagingModuleConfiguration.shared.storage.getAllPendingJobs(typeKey)
|
||||
val pendingJobs = mutableListOf<Job>()
|
||||
for ((id, job) in allPendingJobs) {
|
||||
if (job == null) {
|
||||
// Job failed to deserialize, remove it from the DB
|
||||
handleJobFailedPermanently(id)
|
||||
} else {
|
||||
pendingJobs.add(job)
|
||||
}
|
||||
}
|
||||
pendingJobs.sortedBy { it.id }.forEach { job ->
|
||||
Log.i("Loki", "Resuming pending job of type: ${job::class.simpleName}.")
|
||||
queue.offer(job) // Offer always called on unlimited capacity
|
||||
}
|
||||
}
|
||||
|
||||
fun resumePendingJobs() {
|
||||
if (hasResumedPendingJobs) {
|
||||
Log.d("Loki", "resumePendingJobs() should only be called once.")
|
||||
@@ -114,20 +131,7 @@ class JobQueue : JobDelegate {
|
||||
NotifyPNServerJob.KEY
|
||||
)
|
||||
allJobTypes.forEach { type ->
|
||||
val allPendingJobs = MessagingModuleConfiguration.shared.storage.getAllPendingJobs(type)
|
||||
val pendingJobs = mutableListOf<Job>()
|
||||
for ((id, job) in allPendingJobs) {
|
||||
if (job == null) {
|
||||
// Job failed to deserialize, remove it from the DB
|
||||
handleJobFailedPermanently(id)
|
||||
} else {
|
||||
pendingJobs.add(job)
|
||||
}
|
||||
}
|
||||
pendingJobs.sortedBy { it.id }.forEach { job ->
|
||||
Log.i("Loki", "Resuming pending job of type: ${job::class.simpleName}.")
|
||||
queue.offer(job) // Offer always called on unlimited capacity
|
||||
}
|
||||
resumePendingJobs(type)
|
||||
}
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user