From 5d8f036f8211e2c4a225cd2b5bb1ed606cdad5aa Mon Sep 17 00:00:00 2001 From: jubb Date: Tue, 27 Apr 2021 17:29:37 +1000 Subject: [PATCH] fix: attachment downloads and uploads enable multi-threaded attachment handling for messages to speed up download/upload and free up message processing queue. leaving group removes appropriate entries now in threaddb --- .../securesms/database/Storage.kt | 2 +- .../loki/activities/JoinPublicChatActivity.kt | 3 ++ .../securesms/loki/api/PublicChatManager.kt | 1 + .../loki/database/LokiThreadDatabase.kt | 2 +- .../messaging/jobs/AttachmentDownloadJob.kt | 28 +++++++++++-------- .../libsession/messaging/jobs/JobQueue.kt | 19 +++++++++++-- .../messaging/opengroups/OpenGroupAPIV2.kt | 23 +++++++++++---- .../opengroups/OpenGroupMessageV2.kt | 3 +- .../sending_receiving/MessageReceiver.kt | 1 + .../sending_receiving/MessageSender.kt | 18 ++++++++++-- 10 files changed, 76 insertions(+), 24 deletions(-) diff --git a/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt b/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt index 059d94e175..bdc9a2268b 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt @@ -580,7 +580,7 @@ class Storage(context: Context, helper: SQLCipherOpenHelper) : Database(context, val mmsDb = DatabaseFactory.getMmsDatabase(context) val cursor = mmsDb.getMessage(mmsId) val reader = mmsDb.readerFor(cursor) - return reader.current.threadId + return reader.next.threadId } override fun getSessionRequestSentTimestamp(publicKey: String): Long? { diff --git a/app/src/main/java/org/thoughtcrime/securesms/loki/activities/JoinPublicChatActivity.kt b/app/src/main/java/org/thoughtcrime/securesms/loki/activities/JoinPublicChatActivity.kt index b6e9443e9c..e252ce7fdc 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/loki/activities/JoinPublicChatActivity.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/loki/activities/JoinPublicChatActivity.kt @@ -189,6 +189,9 @@ class EnterChatURLFragment : Fragment() { } chip.chipIcon = drawable chip.text = defaultGroup.name + chip.setOnClickListener { + (requireActivity() as JoinPublicChatActivity).joinPublicChatIfPossible(defaultGroup.toJoinUrl()) + } defaultRoomsGridLayout.addView(chip) } if (groups.size and 1 != 0) { diff --git a/app/src/main/java/org/thoughtcrime/securesms/loki/api/PublicChatManager.kt b/app/src/main/java/org/thoughtcrime/securesms/loki/api/PublicChatManager.kt index 129b8d83b6..82d877b7ad 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/loki/api/PublicChatManager.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/loki/api/PublicChatManager.kt @@ -172,6 +172,7 @@ class PublicChatManager(private val context: Context) { DatabaseFactory.getLokiThreadDatabase(context).removePublicChat(threadID) pollers.remove(threadID)?.stop() + v2Pollers.remove(threadID)?.stop() observers.remove(threadID) startPollersIfNeeded() } diff --git a/app/src/main/java/org/thoughtcrime/securesms/loki/database/LokiThreadDatabase.kt b/app/src/main/java/org/thoughtcrime/securesms/loki/database/LokiThreadDatabase.kt index bbaf79887e..44c550b364 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/loki/database/LokiThreadDatabase.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/loki/database/LokiThreadDatabase.kt @@ -89,7 +89,7 @@ class LokiThreadDatabase(context: Context, helper: SQLCipherOpenHelper) : Databa return null } val database = databaseHelper.readableDatabase - return database.get(publicChat, "${Companion.threadID} = ?", arrayOf(threadID.toString())) { cursor -> + return database.get(publicChatTable, "${Companion.threadID} = ?", arrayOf(threadID.toString())) { cursor -> val json = cursor.getString(publicChat) OpenGroupV2.fromJson(json) } diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/AttachmentDownloadJob.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/AttachmentDownloadJob.kt index 4c48b44977..465e7f8eed 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/AttachmentDownloadJob.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/AttachmentDownloadJob.kt @@ -1,7 +1,9 @@ package org.session.libsession.messaging.jobs +import okhttp3.HttpUrl import org.session.libsession.messaging.MessagingConfiguration import org.session.libsession.messaging.fileserver.FileServerAPI +import org.session.libsession.messaging.opengroups.OpenGroupAPIV2 import org.session.libsession.messaging.sending_receiving.attachments.AttachmentState import org.session.libsession.messaging.utilities.DotNetAPI import org.session.libsignal.service.api.crypto.AttachmentCipherInputStream @@ -10,7 +12,7 @@ import org.session.libsignal.utilities.logging.Log import java.io.File import java.io.FileInputStream -class AttachmentDownloadJob(val attachmentID: Long, val databaseMessageID: Long): Job { +class AttachmentDownloadJob(val attachmentID: Long, val databaseMessageID: Long) : Job { override var delegate: JobDelegate? = null override var id: String? = null @@ -25,6 +27,7 @@ class AttachmentDownloadJob(val attachmentID: Long, val databaseMessageID: Long) // Settings override val maxFailureCount: Int = 20 + companion object { val KEY: String = "AttachmentDownloadJob" @@ -35,7 +38,7 @@ class AttachmentDownloadJob(val attachmentID: Long, val databaseMessageID: Long) override fun execute() { val handleFailure: (java.lang.Exception) -> Unit = { exception -> - if(exception is Error && exception == Error.NoAttachment) { + if (exception is Error && exception == Error.NoAttachment) { MessagingConfiguration.shared.messageDataProvider.setAttachmentState(AttachmentState.FAILED, attachmentID, databaseMessageID) this.handlePermanentFailure(exception) } else if (exception is DotNetAPI.Error && exception == DotNetAPI.Error.ParsingFailed) { @@ -49,28 +52,31 @@ class AttachmentDownloadJob(val attachmentID: Long, val databaseMessageID: Long) } try { val messageDataProvider = MessagingConfiguration.shared.messageDataProvider - val attachment = messageDataProvider.getDatabaseAttachment(attachmentID) ?: return handleFailure(Error.NoAttachment) + val attachment = messageDataProvider.getDatabaseAttachment(attachmentID) + ?: return handleFailure(Error.NoAttachment) messageDataProvider.setAttachmentState(AttachmentState.STARTED, attachmentID, this.databaseMessageID) val tempFile = createTempFile() val threadId = MessagingConfiguration.shared.storage.getThreadIdForMms(databaseMessageID) val openGroupV2 = MessagingConfiguration.shared.storage.getV2OpenGroup(threadId.toString()) - val isOpenGroupV2 = false - if (!isOpenGroupV2) { + val stream = if (openGroupV2 == null) { FileServerAPI.shared.downloadFile(tempFile, attachment.url, MAX_ATTACHMENT_SIZE, null) // DECRYPTION // Assume we're retrieving an attachment for an open group server if the digest is not set - val stream = if (attachment.digest?.size ?: 0 == 0 || attachment.key.isNullOrEmpty()) FileInputStream(tempFile) + if (attachment.digest?.size ?: 0 == 0 || attachment.key.isNullOrEmpty()) FileInputStream(tempFile) else AttachmentCipherInputStream.createForAttachment(tempFile, attachment.size, Base64.decode(attachment.key), attachment.digest) - - messageDataProvider.insertAttachment(databaseMessageID, attachment.attachmentId, stream) } else { -// val bytes = OpenGroupAPIV2.download() + val url = HttpUrl.parse(attachment.url)!! + val fileId = url.pathSegments().last() + OpenGroupAPIV2.download(fileId.toLong(), openGroupV2.room, openGroupV2.server).get().let { + tempFile.writeBytes(it) + } + FileInputStream(tempFile) } - + messageDataProvider.insertAttachment(databaseMessageID, attachment.attachmentId, stream) tempFile.delete() handleSuccess() } catch (e: Exception) { @@ -109,7 +115,7 @@ class AttachmentDownloadJob(val attachmentID: Long, val databaseMessageID: Long) return KEY } - class Factory: Job.Factory { + class Factory : Job.Factory { override fun create(data: Data): AttachmentDownloadJob { return AttachmentDownloadJob(data.getLong(KEY_ATTACHMENT_ID), data.getLong(KEY_TS_INCOMING_MESSAGE_ID)) } diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt index d56bdfd9b7..8f334120cb 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt @@ -21,6 +21,7 @@ class JobQueue : JobDelegate { private val jobTimestampMap = ConcurrentHashMap() private val dispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher() + private val multiDispatcher = Executors.newFixedThreadPool(2).asCoroutineDispatcher() private val scope = GlobalScope + SupervisorJob() private val queue = Channel(UNLIMITED) val timer = Timer() @@ -30,8 +31,15 @@ class JobQueue : JobDelegate { scope.launch(dispatcher) { while (isActive) { queue.receive().let { job -> - job.delegate = this@JobQueue - job.execute() + if (job.canExecuteParallel()) { + launch(multiDispatcher) { + job.delegate = this@JobQueue + job.execute() + } + } else { + job.delegate = this@JobQueue + job.execute() + } } } } @@ -42,6 +50,13 @@ class JobQueue : JobDelegate { val shared: JobQueue by lazy { JobQueue() } } + private fun Job.canExecuteParallel(): Boolean { + return this.javaClass in arrayOf( + AttachmentUploadJob::class.java, + AttachmentDownloadJob::class.java + ) + } + fun add(job: Job) { addWithoutExecuting(job) queue.offer(job) // offer always called on unlimited capacity diff --git a/libsession/src/main/java/org/session/libsession/messaging/opengroups/OpenGroupAPIV2.kt b/libsession/src/main/java/org/session/libsession/messaging/opengroups/OpenGroupAPIV2.kt index 23117b1ac8..1f0f3af799 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/opengroups/OpenGroupAPIV2.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/opengroups/OpenGroupAPIV2.kt @@ -2,6 +2,7 @@ package org.session.libsession.messaging.opengroups import kotlinx.coroutines.* import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.asSharedFlow import nl.komponents.kovenant.Kovenant import nl.komponents.kovenant.Promise import nl.komponents.kovenant.functional.bind @@ -35,6 +36,8 @@ object OpenGroupAPIV2 { const val DEFAULT_SERVER = "https://sog.ibolpap.finance" private const val DEFAULT_SERVER_PUBLIC_KEY = "b464aa186530c97d6bcf663a3a3b7465a5f782beaa67c83bee99468824b4aa10" + // https://sog.ibolpap.finance/main?public_key=b464aa186530c97d6bcf663a3a3b7465a5f782beaa67c83bee99468824b4aa10 + val defaultRooms = MutableSharedFlow>(replay = 1) private val sharedContext = Kovenant.createContext() @@ -51,7 +54,9 @@ object OpenGroupAPIV2 { data class DefaultGroup(val id: String, val name: String, - val image: ByteArray?) + val image: ByteArray?) { + fun toJoinUrl(): String = "$DEFAULT_SERVER/$id?public_key=$DEFAULT_SERVER_PUBLIC_KEY" + } data class Info( val id: String, @@ -120,9 +125,13 @@ object OpenGroupAPIV2 { ?: return Promise.ofFail(Error.NO_PUBLIC_KEY) return OnionRequestAPI.sendOnionRequest(requestBuilder.build(), request.server, publicKey) .fail { e -> - if (e is OnionRequestAPI.HTTPRequestFailedAtDestinationException - && e.statusCode == 401) { - MessagingConfiguration.shared.storage.removeAuthToken(request.server) + if (e is OnionRequestAPI.HTTPRequestFailedAtDestinationException && e.statusCode == 401) { + val storage = MessagingConfiguration.shared.storage + if (request.room != null) { + storage.removeAuthToken("${request.server}.${request.room}") + } else { + storage.removeAuthToken(request.server) + } } } } else { @@ -353,7 +362,11 @@ object OpenGroupAPIV2 { val earlyGroups = groups.map { group -> DefaultGroup(group.id, group.name, null) } - defaultRooms.tryEmit(earlyGroups) // TODO: take into account cached w/ images groups + defaultRooms.replayCache.firstOrNull()?.let { groups -> + if (groups.none { it.image?.isNotEmpty() == true}) { + defaultRooms.tryEmit(earlyGroups) + } + } val images = groups.map { group -> group.id to downloadOpenGroupProfilePicture(group.id, DEFAULT_SERVER) }.toMap() diff --git a/libsession/src/main/java/org/session/libsession/messaging/opengroups/OpenGroupMessageV2.kt b/libsession/src/main/java/org/session/libsession/messaging/opengroups/OpenGroupMessageV2.kt index b7c72b3cf4..19ba4341f4 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/opengroups/OpenGroupMessageV2.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/opengroups/OpenGroupMessageV2.kt @@ -1,6 +1,7 @@ package org.session.libsession.messaging.opengroups import org.session.libsession.messaging.MessagingConfiguration +import org.session.libsignal.service.internal.push.PushTransportDetails import org.session.libsignal.service.internal.push.SignalServiceProtos import org.session.libsignal.utilities.Base64 import org.session.libsignal.utilities.Base64.decode @@ -61,7 +62,7 @@ data class OpenGroupMessageV2( return jsonMap } - fun toProto(): SignalServiceProtos.DataMessage = decode(base64EncodedData).let { bytes -> + fun toProto(): SignalServiceProtos.DataMessage = decode(base64EncodedData).let(PushTransportDetails::getStrippedPaddingMessageBody).let { bytes -> SignalServiceProtos.DataMessage.parseFrom(bytes) } diff --git a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/MessageReceiver.kt b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/MessageReceiver.kt index 9cfbfaa720..770376133e 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/MessageReceiver.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/MessageReceiver.kt @@ -36,6 +36,7 @@ object MessageReceiver { is UnknownEnvelopeType -> false is InvalidSignature -> false is NoData -> false + is NoThread -> false is SenderBlocked -> false is SelfSend -> false else -> true diff --git a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/MessageSender.kt b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/MessageSender.kt index 733063d797..c6384fd47a 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/MessageSender.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/MessageSender.kt @@ -113,9 +113,9 @@ object MessageSender { if (message is VisibleMessage) { val displayName = storage.getUserDisplayName()!! val profileKey = storage.getUserProfileKey() - val profilePrictureUrl = storage.getUserProfilePictureURL() - if (profileKey != null && profilePrictureUrl != null) { - message.profile = Profile(displayName, profileKey, profilePrictureUrl) + val profilePictureUrl = storage.getUserProfilePictureURL() + if (profileKey != null && profilePictureUrl != null) { + message.profile = Profile(displayName, profileKey, profilePictureUrl) } else { message.profile = Profile(displayName) } @@ -243,6 +243,18 @@ object MessageSender { val server = destination.server val room = destination.room + // Attach the user's profile if needed + if (message is VisibleMessage) { + val displayName = storage.getUserDisplayName()!! + val profileKey = storage.getUserProfileKey() + val profilePictureUrl = storage.getUserProfilePictureURL() + if (profileKey != null && profilePictureUrl != null) { + message.profile = Profile(displayName, profileKey, profilePictureUrl) + } else { + message.profile = Profile(displayName) + } + } + // Validate the message if (message !is VisibleMessage || !message.isValid()) { throw Error.InvalidMessage