fix: attachment downloads and uploads

enable multi-threaded attachment handling for messages to speed up download/upload and free up message processing queue.

leaving group removes appropriate entries now in threaddb
This commit is contained in:
jubb 2021-04-27 17:29:37 +10:00
parent d3bd844d82
commit 5d8f036f82
10 changed files with 76 additions and 24 deletions

View File

@ -580,7 +580,7 @@ class Storage(context: Context, helper: SQLCipherOpenHelper) : Database(context,
val mmsDb = DatabaseFactory.getMmsDatabase(context)
val cursor = mmsDb.getMessage(mmsId)
val reader = mmsDb.readerFor(cursor)
return reader.current.threadId
return reader.next.threadId
}
override fun getSessionRequestSentTimestamp(publicKey: String): Long? {

View File

@ -189,6 +189,9 @@ class EnterChatURLFragment : Fragment() {
}
chip.chipIcon = drawable
chip.text = defaultGroup.name
chip.setOnClickListener {
(requireActivity() as JoinPublicChatActivity).joinPublicChatIfPossible(defaultGroup.toJoinUrl())
}
defaultRoomsGridLayout.addView(chip)
}
if (groups.size and 1 != 0) {

View File

@ -172,6 +172,7 @@ class PublicChatManager(private val context: Context) {
DatabaseFactory.getLokiThreadDatabase(context).removePublicChat(threadID)
pollers.remove(threadID)?.stop()
v2Pollers.remove(threadID)?.stop()
observers.remove(threadID)
startPollersIfNeeded()
}

View File

@ -89,7 +89,7 @@ class LokiThreadDatabase(context: Context, helper: SQLCipherOpenHelper) : Databa
return null
}
val database = databaseHelper.readableDatabase
return database.get(publicChat, "${Companion.threadID} = ?", arrayOf(threadID.toString())) { cursor ->
return database.get(publicChatTable, "${Companion.threadID} = ?", arrayOf(threadID.toString())) { cursor ->
val json = cursor.getString(publicChat)
OpenGroupV2.fromJson(json)
}

View File

@ -1,7 +1,9 @@
package org.session.libsession.messaging.jobs
import okhttp3.HttpUrl
import org.session.libsession.messaging.MessagingConfiguration
import org.session.libsession.messaging.fileserver.FileServerAPI
import org.session.libsession.messaging.opengroups.OpenGroupAPIV2
import org.session.libsession.messaging.sending_receiving.attachments.AttachmentState
import org.session.libsession.messaging.utilities.DotNetAPI
import org.session.libsignal.service.api.crypto.AttachmentCipherInputStream
@ -10,7 +12,7 @@ import org.session.libsignal.utilities.logging.Log
import java.io.File
import java.io.FileInputStream
class AttachmentDownloadJob(val attachmentID: Long, val databaseMessageID: Long): Job {
class AttachmentDownloadJob(val attachmentID: Long, val databaseMessageID: Long) : Job {
override var delegate: JobDelegate? = null
override var id: String? = null
@ -25,6 +27,7 @@ class AttachmentDownloadJob(val attachmentID: Long, val databaseMessageID: Long)
// Settings
override val maxFailureCount: Int = 20
companion object {
val KEY: String = "AttachmentDownloadJob"
@ -35,7 +38,7 @@ class AttachmentDownloadJob(val attachmentID: Long, val databaseMessageID: Long)
override fun execute() {
val handleFailure: (java.lang.Exception) -> Unit = { exception ->
if(exception is Error && exception == Error.NoAttachment) {
if (exception is Error && exception == Error.NoAttachment) {
MessagingConfiguration.shared.messageDataProvider.setAttachmentState(AttachmentState.FAILED, attachmentID, databaseMessageID)
this.handlePermanentFailure(exception)
} else if (exception is DotNetAPI.Error && exception == DotNetAPI.Error.ParsingFailed) {
@ -49,28 +52,31 @@ class AttachmentDownloadJob(val attachmentID: Long, val databaseMessageID: Long)
}
try {
val messageDataProvider = MessagingConfiguration.shared.messageDataProvider
val attachment = messageDataProvider.getDatabaseAttachment(attachmentID) ?: return handleFailure(Error.NoAttachment)
val attachment = messageDataProvider.getDatabaseAttachment(attachmentID)
?: return handleFailure(Error.NoAttachment)
messageDataProvider.setAttachmentState(AttachmentState.STARTED, attachmentID, this.databaseMessageID)
val tempFile = createTempFile()
val threadId = MessagingConfiguration.shared.storage.getThreadIdForMms(databaseMessageID)
val openGroupV2 = MessagingConfiguration.shared.storage.getV2OpenGroup(threadId.toString())
val isOpenGroupV2 = false
if (!isOpenGroupV2) {
val stream = if (openGroupV2 == null) {
FileServerAPI.shared.downloadFile(tempFile, attachment.url, MAX_ATTACHMENT_SIZE, null)
// DECRYPTION
// Assume we're retrieving an attachment for an open group server if the digest is not set
val stream = if (attachment.digest?.size ?: 0 == 0 || attachment.key.isNullOrEmpty()) FileInputStream(tempFile)
if (attachment.digest?.size ?: 0 == 0 || attachment.key.isNullOrEmpty()) FileInputStream(tempFile)
else AttachmentCipherInputStream.createForAttachment(tempFile, attachment.size, Base64.decode(attachment.key), attachment.digest)
messageDataProvider.insertAttachment(databaseMessageID, attachment.attachmentId, stream)
} else {
// val bytes = OpenGroupAPIV2.download()
val url = HttpUrl.parse(attachment.url)!!
val fileId = url.pathSegments().last()
OpenGroupAPIV2.download(fileId.toLong(), openGroupV2.room, openGroupV2.server).get().let {
tempFile.writeBytes(it)
}
FileInputStream(tempFile)
}
messageDataProvider.insertAttachment(databaseMessageID, attachment.attachmentId, stream)
tempFile.delete()
handleSuccess()
} catch (e: Exception) {
@ -109,7 +115,7 @@ class AttachmentDownloadJob(val attachmentID: Long, val databaseMessageID: Long)
return KEY
}
class Factory: Job.Factory<AttachmentDownloadJob> {
class Factory : Job.Factory<AttachmentDownloadJob> {
override fun create(data: Data): AttachmentDownloadJob {
return AttachmentDownloadJob(data.getLong(KEY_ATTACHMENT_ID), data.getLong(KEY_TS_INCOMING_MESSAGE_ID))
}

View File

@ -21,6 +21,7 @@ class JobQueue : JobDelegate {
private val jobTimestampMap = ConcurrentHashMap<Long, AtomicInteger>()
private val dispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
private val multiDispatcher = Executors.newFixedThreadPool(2).asCoroutineDispatcher()
private val scope = GlobalScope + SupervisorJob()
private val queue = Channel<Job>(UNLIMITED)
val timer = Timer()
@ -30,9 +31,16 @@ class JobQueue : JobDelegate {
scope.launch(dispatcher) {
while (isActive) {
queue.receive().let { job ->
if (job.canExecuteParallel()) {
launch(multiDispatcher) {
job.delegate = this@JobQueue
job.execute()
}
} else {
job.delegate = this@JobQueue
job.execute()
}
}
}
}
}
@ -42,6 +50,13 @@ class JobQueue : JobDelegate {
val shared: JobQueue by lazy { JobQueue() }
}
private fun Job.canExecuteParallel(): Boolean {
return this.javaClass in arrayOf(
AttachmentUploadJob::class.java,
AttachmentDownloadJob::class.java
)
}
fun add(job: Job) {
addWithoutExecuting(job)
queue.offer(job) // offer always called on unlimited capacity

View File

@ -2,6 +2,7 @@ package org.session.libsession.messaging.opengroups
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.asSharedFlow
import nl.komponents.kovenant.Kovenant
import nl.komponents.kovenant.Promise
import nl.komponents.kovenant.functional.bind
@ -35,6 +36,8 @@ object OpenGroupAPIV2 {
const val DEFAULT_SERVER = "https://sog.ibolpap.finance"
private const val DEFAULT_SERVER_PUBLIC_KEY = "b464aa186530c97d6bcf663a3a3b7465a5f782beaa67c83bee99468824b4aa10"
// https://sog.ibolpap.finance/main?public_key=b464aa186530c97d6bcf663a3a3b7465a5f782beaa67c83bee99468824b4aa10
val defaultRooms = MutableSharedFlow<List<DefaultGroup>>(replay = 1)
private val sharedContext = Kovenant.createContext()
@ -51,7 +54,9 @@ object OpenGroupAPIV2 {
data class DefaultGroup(val id: String,
val name: String,
val image: ByteArray?)
val image: ByteArray?) {
fun toJoinUrl(): String = "$DEFAULT_SERVER/$id?public_key=$DEFAULT_SERVER_PUBLIC_KEY"
}
data class Info(
val id: String,
@ -120,9 +125,13 @@ object OpenGroupAPIV2 {
?: return Promise.ofFail(Error.NO_PUBLIC_KEY)
return OnionRequestAPI.sendOnionRequest(requestBuilder.build(), request.server, publicKey)
.fail { e ->
if (e is OnionRequestAPI.HTTPRequestFailedAtDestinationException
&& e.statusCode == 401) {
MessagingConfiguration.shared.storage.removeAuthToken(request.server)
if (e is OnionRequestAPI.HTTPRequestFailedAtDestinationException && e.statusCode == 401) {
val storage = MessagingConfiguration.shared.storage
if (request.room != null) {
storage.removeAuthToken("${request.server}.${request.room}")
} else {
storage.removeAuthToken(request.server)
}
}
}
} else {
@ -353,7 +362,11 @@ object OpenGroupAPIV2 {
val earlyGroups = groups.map { group ->
DefaultGroup(group.id, group.name, null)
}
defaultRooms.tryEmit(earlyGroups) // TODO: take into account cached w/ images groups
defaultRooms.replayCache.firstOrNull()?.let { groups ->
if (groups.none { it.image?.isNotEmpty() == true}) {
defaultRooms.tryEmit(earlyGroups)
}
}
val images = groups.map { group ->
group.id to downloadOpenGroupProfilePicture(group.id, DEFAULT_SERVER)
}.toMap()

View File

@ -1,6 +1,7 @@
package org.session.libsession.messaging.opengroups
import org.session.libsession.messaging.MessagingConfiguration
import org.session.libsignal.service.internal.push.PushTransportDetails
import org.session.libsignal.service.internal.push.SignalServiceProtos
import org.session.libsignal.utilities.Base64
import org.session.libsignal.utilities.Base64.decode
@ -61,7 +62,7 @@ data class OpenGroupMessageV2(
return jsonMap
}
fun toProto(): SignalServiceProtos.DataMessage = decode(base64EncodedData).let { bytes ->
fun toProto(): SignalServiceProtos.DataMessage = decode(base64EncodedData).let(PushTransportDetails::getStrippedPaddingMessageBody).let { bytes ->
SignalServiceProtos.DataMessage.parseFrom(bytes)
}

View File

@ -36,6 +36,7 @@ object MessageReceiver {
is UnknownEnvelopeType -> false
is InvalidSignature -> false
is NoData -> false
is NoThread -> false
is SenderBlocked -> false
is SelfSend -> false
else -> true

View File

@ -113,9 +113,9 @@ object MessageSender {
if (message is VisibleMessage) {
val displayName = storage.getUserDisplayName()!!
val profileKey = storage.getUserProfileKey()
val profilePrictureUrl = storage.getUserProfilePictureURL()
if (profileKey != null && profilePrictureUrl != null) {
message.profile = Profile(displayName, profileKey, profilePrictureUrl)
val profilePictureUrl = storage.getUserProfilePictureURL()
if (profileKey != null && profilePictureUrl != null) {
message.profile = Profile(displayName, profileKey, profilePictureUrl)
} else {
message.profile = Profile(displayName)
}
@ -243,6 +243,18 @@ object MessageSender {
val server = destination.server
val room = destination.room
// Attach the user's profile if needed
if (message is VisibleMessage) {
val displayName = storage.getUserDisplayName()!!
val profileKey = storage.getUserProfileKey()
val profilePictureUrl = storage.getUserProfilePictureURL()
if (profileKey != null && profilePictureUrl != null) {
message.profile = Profile(displayName, profileKey, profilePictureUrl)
} else {
message.profile = Profile(displayName)
}
}
// Validate the message
if (message !is VisibleMessage || !message.isValid()) {
throw Error.InvalidMessage