From 0da949c8e69e8cc8803a4a4c34d7e0ed74b74f9e Mon Sep 17 00:00:00 2001 From: Fanchao Liu <273191+simophin@users.noreply.github.com> Date: Mon, 1 Jul 2024 17:36:50 +1000 Subject: [PATCH] [SES-1966] Attachment batch download and tidy-up (#1507) * Attachment batch download * Addressed feedback and test issues * Feedback fixes * timedWindow for flow * Feedback * Dispatchers * Remove `flowOn` * New implementation of timedBuffer * Organise import * Feedback * Fix test * Tidied up logic around `eligibleForDownload` * Updated comment --------- Co-authored-by: fanchao --- .../v2/AttachmentDownloadHandler.kt | 115 ++++++++++++++++++ .../conversation/v2/ConversationActivityV2.kt | 6 +- .../conversation/v2/ConversationAdapter.kt | 3 +- .../conversation/v2/ConversationViewModel.kt | 38 ++++-- .../conversation/v2/MessageDetailActivity.kt | 3 +- .../v2/MessageDetailsViewModel.kt | 6 +- .../v2/components/AlbumThumbnailView.kt | 4 +- .../v2/messages/VisibleMessageContentView.kt | 14 +-- .../v2/messages/VisibleMessageView.kt | 3 +- .../securesms/database/SessionJobDatabase.kt | 11 +- .../securesms/database/Storage.kt | 4 +- .../thoughtcrime/securesms/util/FlowUtils.kt | 44 +++++++ .../v2/ConversationViewModelTest.kt | 4 +- .../securesms/util/FlowUtilsTest.kt | 52 ++++++++ .../libsession/database/StorageProtocol.kt | 2 +- .../messaging/jobs/AttachmentDownloadJob.kt | 48 +++++--- .../libsession/messaging/jobs/JobQueue.kt | 2 +- 17 files changed, 301 insertions(+), 58 deletions(-) create mode 100644 app/src/main/java/org/thoughtcrime/securesms/conversation/v2/AttachmentDownloadHandler.kt create mode 100644 app/src/main/java/org/thoughtcrime/securesms/util/FlowUtils.kt create mode 100644 app/src/test/java/org/thoughtcrime/securesms/util/FlowUtilsTest.kt diff --git a/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/AttachmentDownloadHandler.kt b/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/AttachmentDownloadHandler.kt new file mode 100644 index 0000000000..7278be71bc --- /dev/null +++ b/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/AttachmentDownloadHandler.kt @@ -0,0 +1,115 @@ +package org.thoughtcrime.securesms.conversation.v2 + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.receiveAsFlow +import kotlinx.coroutines.launch +import kotlinx.coroutines.plus +import org.session.libsession.database.MessageDataProvider +import org.session.libsession.database.StorageProtocol +import org.session.libsession.messaging.jobs.AttachmentDownloadJob +import org.session.libsession.messaging.jobs.AttachmentUploadJob +import org.session.libsession.messaging.jobs.JobQueue +import org.session.libsession.messaging.sending_receiving.attachments.AttachmentTransferProgress +import org.session.libsession.messaging.sending_receiving.attachments.DatabaseAttachment +import org.session.libsignal.utilities.Log +import org.thoughtcrime.securesms.util.flatten +import org.thoughtcrime.securesms.util.timedBuffer + +/** + * [AttachmentDownloadHandler] is responsible for handling attachment download requests. These + * requests will go through different level of checking before they are queued for download. + * + * To use this handler, call [onAttachmentDownloadRequest] with the attachment that needs to be + * downloaded. The call to [onAttachmentDownloadRequest] is cheap and can be called multiple times. + */ +class AttachmentDownloadHandler( + private val storage: StorageProtocol, + private val messageDataProvider: MessageDataProvider, + jobQueue: JobQueue = JobQueue.shared, + scope: CoroutineScope = CoroutineScope(Dispatchers.Default) + SupervisorJob(), +) { + companion object { + private const val BUFFER_TIMEOUT_MILLS = 500L + private const val BUFFER_MAX_ITEMS = 10 + private const val LOG_TAG = "AttachmentDownloadHelper" + } + + private val downloadRequests = Channel(UNLIMITED) + + init { + scope.launch(Dispatchers.Default) { + downloadRequests + .receiveAsFlow() + .timedBuffer(BUFFER_TIMEOUT_MILLS, BUFFER_MAX_ITEMS) + .map(::filterEligibleAttachments) + .flatten() + .collect { attachment -> + jobQueue.add( + AttachmentDownloadJob( + attachmentID = attachment.attachmentId.rowId, + databaseMessageID = attachment.mmsId + ) + ) + } + } + } + + /** + * Filter attachments that are eligible for creating download jobs. + * + */ + private fun filterEligibleAttachments(attachments: List): List { + val pendingAttachmentIDs = storage + .getAllPendingJobs(AttachmentDownloadJob.KEY, AttachmentUploadJob.KEY) + .values + .mapNotNull { + (it as? AttachmentUploadJob)?.attachmentID + ?: (it as? AttachmentDownloadJob)?.attachmentID + } + .toSet() + + + return attachments.filter { attachment -> + eligibleForDownloadTask( + attachment, + pendingAttachmentIDs, + ) + } + } + + /** + * Check if the attachment is eligible for download task. + */ + private fun eligibleForDownloadTask( + attachment: DatabaseAttachment, + pendingJobsAttachmentRowIDs: Set, + ): Boolean { + if (attachment.attachmentId.rowId in pendingJobsAttachmentRowIDs) { + return false + } + + val threadID = storage.getThreadIdForMms(attachment.mmsId) + + return AttachmentDownloadJob.eligibleForDownload( + threadID, storage, messageDataProvider, attachment.mmsId, + ) + } + + + fun onAttachmentDownloadRequest(attachment: DatabaseAttachment) { + if (attachment.transferState != AttachmentTransferProgress.TRANSFER_PROGRESS_PENDING) { + Log.i( + LOG_TAG, + "Attachment ${attachment.attachmentId} is not pending, skipping download" + ) + return + } + + downloadRequests.trySend(attachment) + } +} diff --git a/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/ConversationActivityV2.kt b/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/ConversationActivityV2.kt index 7fdca6936e..e7f413b6fc 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/ConversationActivityV2.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/ConversationActivityV2.kt @@ -333,11 +333,7 @@ class ConversationActivityV2 : PassphraseRequiredActionBarActivity(), InputBarDe onDeselect(message, position, it) } }, - onAttachmentNeedsDownload = { attachmentId, mmsId -> - lifecycleScope.launch(Dispatchers.IO) { - JobQueue.shared.add(AttachmentDownloadJob(attachmentId, mmsId)) - } - }, + onAttachmentNeedsDownload = viewModel::onAttachmentDownloadRequest, glide = glide, lifecycleCoroutineScope = lifecycleScope ) diff --git a/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/ConversationAdapter.kt b/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/ConversationAdapter.kt index d051d7d93c..340336e53b 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/ConversationAdapter.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/ConversationAdapter.kt @@ -19,6 +19,7 @@ import kotlinx.coroutines.isActive import kotlinx.coroutines.launch import network.loki.messenger.R import org.session.libsession.messaging.contacts.Contact +import org.session.libsession.messaging.sending_receiving.attachments.DatabaseAttachment import org.thoughtcrime.securesms.conversation.v2.messages.ControlMessageView import org.thoughtcrime.securesms.conversation.v2.messages.VisibleMessageView import org.thoughtcrime.securesms.conversation.v2.messages.VisibleMessageViewDelegate @@ -40,7 +41,7 @@ class ConversationAdapter( private val onItemSwipeToReply: (MessageRecord, Int) -> Unit, private val onItemLongPress: (MessageRecord, Int, VisibleMessageView) -> Unit, private val onDeselect: (MessageRecord, Int) -> Unit, - private val onAttachmentNeedsDownload: (Long, Long) -> Unit, + private val onAttachmentNeedsDownload: (DatabaseAttachment) -> Unit, private val glide: GlideRequests, lifecycleCoroutineScope: LifecycleCoroutineScope ) : CursorRecyclerViewAdapter(context, cursor) { diff --git a/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/ConversationViewModel.kt b/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/ConversationViewModel.kt index 1a036eee11..b29f9e1b49 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/ConversationViewModel.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/ConversationViewModel.kt @@ -1,46 +1,44 @@ package org.thoughtcrime.securesms.conversation.v2 import android.content.Context - import androidx.lifecycle.ViewModel import androidx.lifecycle.ViewModelProvider import androidx.lifecycle.viewModelScope - import com.goterl.lazysodium.utils.KeyPair - import dagger.assisted.Assisted import dagger.assisted.AssistedInject - import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.flow.update import kotlinx.coroutines.launch - +import org.session.libsession.database.MessageDataProvider import org.session.libsession.messaging.messages.ExpirationConfiguration import org.session.libsession.messaging.open_groups.OpenGroup import org.session.libsession.messaging.open_groups.OpenGroupApi +import org.session.libsession.messaging.sending_receiving.attachments.DatabaseAttachment import org.session.libsession.messaging.utilities.SessionId import org.session.libsession.messaging.utilities.SodiumUtilities import org.session.libsession.utilities.Address import org.session.libsession.utilities.recipients.Recipient import org.session.libsignal.utilities.IdPrefix import org.session.libsignal.utilities.Log +import org.thoughtcrime.securesms.database.MmsDatabase import org.thoughtcrime.securesms.audio.AudioSlidePlayer - import org.thoughtcrime.securesms.database.Storage import org.thoughtcrime.securesms.database.model.MessageRecord import org.thoughtcrime.securesms.database.model.MmsMessageRecord import org.thoughtcrime.securesms.repository.ConversationRepository - import java.util.UUID class ConversationViewModel( val threadId: Long, val edKeyPair: KeyPair?, private val repository: ConversationRepository, - private val storage: Storage + private val storage: Storage, + private val messageDataProvider: MessageDataProvider, + database: MmsDatabase, ) : ViewModel() { val showSendAfterApprovalText: Boolean @@ -92,6 +90,11 @@ class ConversationViewModel( // allow reactions if the open group is null (normal conversations) or the open group's capabilities include reactions get() = (openGroup == null || OpenGroupApi.Capability.REACTIONS.name.lowercase() in serverCapabilities) + private val attachmentDownloadHandler = AttachmentDownloadHandler( + storage = storage, + messageDataProvider = messageDataProvider, + scope = viewModelScope, + ) init { viewModelScope.launch(Dispatchers.IO) { @@ -242,7 +245,7 @@ class ConversationViewModel( currentUiState.copy(uiMessages = messages) } } - + fun messageShown(messageId: Long) { _uiState.update { currentUiState -> val messages = currentUiState.uiMessages.filterNot { it.id == messageId } @@ -265,6 +268,10 @@ class ConversationViewModel( storage.getLastLegacyRecipient(address.serialize())?.let { Recipient.from(context, Address.fromSerialized(it), false) } } + fun onAttachmentDownloadRequest(attachment: DatabaseAttachment) { + attachmentDownloadHandler.onAttachmentDownloadRequest(attachment) + } + @dagger.assisted.AssistedFactory interface AssistedFactory { fun create(threadId: Long, edKeyPair: KeyPair?): Factory @@ -275,11 +282,20 @@ class ConversationViewModel( @Assisted private val threadId: Long, @Assisted private val edKeyPair: KeyPair?, private val repository: ConversationRepository, - private val storage: Storage + private val storage: Storage, + private val mmsDatabase: MmsDatabase, + private val messageDataProvider: MessageDataProvider, ) : ViewModelProvider.Factory { override fun create(modelClass: Class): T { - return ConversationViewModel(threadId, edKeyPair, repository, storage) as T + return ConversationViewModel( + threadId = threadId, + edKeyPair = edKeyPair, + repository = repository, + storage = storage, + messageDataProvider = messageDataProvider, + database = mmsDatabase + ) as T } } } diff --git a/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/MessageDetailActivity.kt b/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/MessageDetailActivity.kt index d5e28fb936..925968f95b 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/MessageDetailActivity.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/MessageDetailActivity.kt @@ -55,6 +55,7 @@ import dagger.hilt.android.AndroidEntryPoint import kotlinx.coroutines.launch import network.loki.messenger.R import network.loki.messenger.databinding.ViewVisibleMessageContentBinding +import org.session.libsession.messaging.sending_receiving.attachments.DatabaseAttachment import org.thoughtcrime.securesms.MediaPreviewActivity.getPreviewIntent import org.thoughtcrime.securesms.PassphraseRequiredActionBarActivity import org.thoughtcrime.securesms.database.Storage @@ -149,7 +150,7 @@ fun MessageDetails( onResend: (() -> Unit)? = null, onDelete: () -> Unit = {}, onClickImage: (Int) -> Unit = {}, - onAttachmentNeedsDownload: (Long, Long) -> Unit = { _, _ -> } + onAttachmentNeedsDownload: (DatabaseAttachment) -> Unit = { _ -> } ) { Column( modifier = Modifier diff --git a/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/MessageDetailsViewModel.kt b/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/MessageDetailsViewModel.kt index ba153a6b36..fc54b652ae 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/MessageDetailsViewModel.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/MessageDetailsViewModel.kt @@ -124,7 +124,7 @@ class MessageDetailsViewModel @Inject constructor( if (slide.transferState == AttachmentTransferProgress.TRANSFER_PROGRESS_FAILED) { // Restart download here (on IO thread) (slide.asAttachment() as? DatabaseAttachment)?.let { attachment -> - onAttachmentNeedsDownload(attachment.attachmentId.rowId, state.mmsRecord.getId()) + onAttachmentNeedsDownload(attachment) } } @@ -137,9 +137,9 @@ class MessageDetailsViewModel @Inject constructor( } } - fun onAttachmentNeedsDownload(attachmentId: Long, mmsId: Long) { + fun onAttachmentNeedsDownload(attachment: DatabaseAttachment) { viewModelScope.launch(Dispatchers.IO) { - JobQueue.shared.add(AttachmentDownloadJob(attachmentId, mmsId)) + JobQueue.shared.add(AttachmentDownloadJob(attachment.attachmentId.rowId, attachment.mmsId)) } } } diff --git a/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/components/AlbumThumbnailView.kt b/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/components/AlbumThumbnailView.kt index f646ace972..57c42a7719 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/components/AlbumThumbnailView.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/components/AlbumThumbnailView.kt @@ -48,7 +48,7 @@ class AlbumThumbnailView : RelativeLayout { // region Interaction - fun calculateHitObject(event: MotionEvent, mms: MmsMessageRecord, threadRecipient: Recipient, onAttachmentNeedsDownload: (Long, Long) -> Unit) { + fun calculateHitObject(event: MotionEvent, mms: MmsMessageRecord, threadRecipient: Recipient, onAttachmentNeedsDownload: (DatabaseAttachment) -> Unit) { val rawXInt = event.rawX.toInt() val rawYInt = event.rawY.toInt() val eventRect = Rect(rawXInt, rawYInt, rawXInt, rawYInt) @@ -63,7 +63,7 @@ class AlbumThumbnailView : RelativeLayout { if (slide.transferState == AttachmentTransferProgress.TRANSFER_PROGRESS_FAILED) { // Restart download here (on IO thread) (slide.asAttachment() as? DatabaseAttachment)?.let { attachment -> - onAttachmentNeedsDownload(attachment.attachmentId.rowId, mms.getId()) + onAttachmentNeedsDownload(attachment) } } if (slide.isInProgress) return@forEach diff --git a/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/messages/VisibleMessageContentView.kt b/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/messages/VisibleMessageContentView.kt index baf80e56aa..83c6904dec 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/messages/VisibleMessageContentView.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/messages/VisibleMessageContentView.kt @@ -66,7 +66,7 @@ class VisibleMessageContentView : ConstraintLayout { thread: Recipient, searchQuery: String? = null, contactIsTrusted: Boolean = true, - onAttachmentNeedsDownload: (Long, Long) -> Unit, + onAttachmentNeedsDownload: (DatabaseAttachment) -> Unit, suppressThumbnails: Boolean = false ) { // Background @@ -135,19 +135,11 @@ class VisibleMessageContentView : ConstraintLayout { if (message is MmsMessageRecord) { message.slideDeck.asAttachments().forEach { attach -> val dbAttachment = attach as? DatabaseAttachment ?: return@forEach - val attachmentId = dbAttachment.attachmentId.rowId - if (attach.transferState == AttachmentTransferProgress.TRANSFER_PROGRESS_PENDING - && MessagingModuleConfiguration.shared.storage.getAttachmentUploadJob(attachmentId) == null) { - onAttachmentNeedsDownload(attachmentId, dbAttachment.mmsId) - } + onAttachmentNeedsDownload(dbAttachment) } message.linkPreviews.forEach { preview -> val previewThumbnail = preview.getThumbnail().orNull() as? DatabaseAttachment ?: return@forEach - val attachmentId = previewThumbnail.attachmentId.rowId - if (previewThumbnail.transferState == AttachmentTransferProgress.TRANSFER_PROGRESS_PENDING - && MessagingModuleConfiguration.shared.storage.getAttachmentUploadJob(attachmentId) == null) { - onAttachmentNeedsDownload(attachmentId, previewThumbnail.mmsId) - } + onAttachmentNeedsDownload(previewThumbnail) } } diff --git a/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/messages/VisibleMessageView.kt b/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/messages/VisibleMessageView.kt index ec26e39986..9470d17de5 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/messages/VisibleMessageView.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/messages/VisibleMessageView.kt @@ -34,6 +34,7 @@ import network.loki.messenger.databinding.ViewstubVisibleMessageMarkerContainerB import org.session.libsession.messaging.contacts.Contact import org.session.libsession.messaging.contacts.Contact.ContactContext import org.session.libsession.messaging.open_groups.OpenGroupApi +import org.session.libsession.messaging.sending_receiving.attachments.DatabaseAttachment import org.session.libsession.utilities.Address import org.session.libsession.utilities.ViewUtil import org.session.libsession.utilities.getColorFromAttr @@ -145,7 +146,7 @@ class VisibleMessageView : FrameLayout { senderSessionID: String, lastSeen: Long, delegate: VisibleMessageViewDelegate? = null, - onAttachmentNeedsDownload: (Long, Long) -> Unit, + onAttachmentNeedsDownload: (DatabaseAttachment) -> Unit, lastSentMessageId: Long ) { replyDisabled = message.isOpenGroupInvitation diff --git a/app/src/main/java/org/thoughtcrime/securesms/database/SessionJobDatabase.kt b/app/src/main/java/org/thoughtcrime/securesms/database/SessionJobDatabase.kt index 591755b88f..e83c464c7d 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/database/SessionJobDatabase.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/database/SessionJobDatabase.kt @@ -3,6 +3,7 @@ package org.thoughtcrime.securesms.database import android.content.ContentValues import android.content.Context import android.database.Cursor +import org.json.JSONArray import org.session.libsession.messaging.jobs.AttachmentDownloadJob import org.session.libsession.messaging.jobs.AttachmentUploadJob import org.session.libsession.messaging.jobs.BackgroundGroupAddJob @@ -50,14 +51,18 @@ class SessionJobDatabase(context: Context, helper: SQLCipherOpenHelper) : Databa databaseHelper.writableDatabase.delete(sessionJobTable, "${Companion.jobID} = ?", arrayOf( jobID )) } - fun getAllJobs(type: String): Map { + fun getAllJobs(vararg types: String): Map { val database = databaseHelper.readableDatabase - return database.getAll(sessionJobTable, "$jobType = ?", arrayOf( type )) { cursor -> + return database.getAll( + sessionJobTable, + "$jobType IN (SELECT value FROM json_each(?))", // Use json_each to bypass limitation of SQLite's IN operator binding + arrayOf( JSONArray(types).toString() ) + ) { cursor -> val jobID = cursor.getString(jobID) try { jobID to jobFromCursor(cursor) } catch (e: Exception) { - Log.e("Loki", "Error deserializing job of type: $type.", e) + Log.e("Loki", "Error deserializing job of type: $types.", e) jobID to null } }.toMap() diff --git a/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt b/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt index 354ec05c46..dd0544b420 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt @@ -397,8 +397,8 @@ open class Storage( DatabaseComponent.get(context).sessionJobDatabase().markJobAsFailedPermanently(jobId) } - override fun getAllPendingJobs(type: String): Map { - return DatabaseComponent.get(context).sessionJobDatabase().getAllJobs(type) + override fun getAllPendingJobs(vararg types: String): Map { + return DatabaseComponent.get(context).sessionJobDatabase().getAllJobs(*types) } override fun getAttachmentUploadJob(attachmentID: Long): AttachmentUploadJob? { diff --git a/app/src/main/java/org/thoughtcrime/securesms/util/FlowUtils.kt b/app/src/main/java/org/thoughtcrime/securesms/util/FlowUtils.kt new file mode 100644 index 0000000000..e5b35b8931 --- /dev/null +++ b/app/src/main/java/org/thoughtcrime/securesms/util/FlowUtils.kt @@ -0,0 +1,44 @@ +package org.thoughtcrime.securesms.util + +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.asFlow +import kotlinx.coroutines.flow.channelFlow +import kotlinx.coroutines.flow.collectLatest +import kotlinx.coroutines.flow.flatMapConcat + +/** + * Buffers items from the flow and emits them in batches. The batch will have size [maxItems] and + * time [timeoutMillis] limit. + */ +fun Flow.timedBuffer(timeoutMillis: Long, maxItems: Int): Flow> { + return channelFlow { + val buffer = mutableListOf() + var bufferBeganAt = -1L + + collectLatest { value -> + if (buffer.isEmpty()) { + bufferBeganAt = System.currentTimeMillis() + } + + buffer.add(value) + + if (buffer.size < maxItems) { + // If the buffer is not full, wait until the time limit is reached. + // The delay here, as a suspension point, will be cancelled by `collectLatest`, + // if another item is collected while we are waiting for the `delay` to complete. + // Once the delay is cancelled, another round of `collectLatest` will be restarted. + delay((System.currentTimeMillis() + timeoutMillis - bufferBeganAt).coerceAtLeast(0L)) + } + + // When we reach here, it's either the buffer is full, or the timeout has been reached: + // send out the buffer and reset the state + send(buffer.toList()) + buffer.clear() + } + } +} + +@OptIn(ExperimentalCoroutinesApi::class) +fun Flow>.flatten(): Flow = flatMapConcat { it.asFlow() } \ No newline at end of file diff --git a/app/src/test/java/org/thoughtcrime/securesms/conversation/v2/ConversationViewModelTest.kt b/app/src/test/java/org/thoughtcrime/securesms/conversation/v2/ConversationViewModelTest.kt index 37303e29d5..7f4db828e4 100644 --- a/app/src/test/java/org/thoughtcrime/securesms/conversation/v2/ConversationViewModelTest.kt +++ b/app/src/test/java/org/thoughtcrime/securesms/conversation/v2/ConversationViewModelTest.kt @@ -23,6 +23,7 @@ import org.session.libsession.utilities.recipients.Recipient import org.session.libsignal.utilities.Log import org.thoughtcrime.securesms.BaseViewModelTest import org.thoughtcrime.securesms.NoOpLogger +import org.thoughtcrime.securesms.database.MmsDatabase import org.thoughtcrime.securesms.database.Storage import org.thoughtcrime.securesms.database.model.MessageRecord import org.thoughtcrime.securesms.repository.ConversationRepository @@ -32,6 +33,7 @@ class ConversationViewModelTest: BaseViewModelTest() { private val repository = mock() private val storage = mock() + private val mmsDatabase = mock() private val threadId = 123L private val edKeyPair = mock() @@ -39,7 +41,7 @@ class ConversationViewModelTest: BaseViewModelTest() { private lateinit var messageRecord: MessageRecord private val viewModel: ConversationViewModel by lazy { - ConversationViewModel(threadId, edKeyPair, repository, storage) + ConversationViewModel(threadId, edKeyPair, repository, storage, mock(), mmsDatabase) } @Before diff --git a/app/src/test/java/org/thoughtcrime/securesms/util/FlowUtilsTest.kt b/app/src/test/java/org/thoughtcrime/securesms/util/FlowUtilsTest.kt new file mode 100644 index 0000000000..f089586de9 --- /dev/null +++ b/app/src/test/java/org/thoughtcrime/securesms/util/FlowUtilsTest.kt @@ -0,0 +1,52 @@ +package org.thoughtcrime.securesms.util + +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.flowOf +import kotlinx.coroutines.flow.toCollection +import kotlinx.coroutines.flow.toList +import kotlinx.coroutines.test.runTest +import org.junit.Assert.assertEquals +import org.junit.Test + +class FlowUtilsTest { + + @Test + fun `timedBuffer should emit buffer when it's full`() = runTest { + // Given + val flow = flowOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) + val timeoutMillis = 1000L + val maxItems = 5 + + // When + val result = flow.timedBuffer(timeoutMillis, maxItems).toList() + + // Then + assertEquals(2, result.size) + assertEquals(listOf(1, 2, 3, 4, 5), result[0]) + assertEquals(listOf(6, 7, 8, 9, 10), result[1]) + } + + @OptIn(ExperimentalCoroutinesApi::class) + @Test + fun `timedBuffer should emit buffer when timeout expires`() = runTest { + // Given + val flow = flow { + emit(1) + emit(2) + emit(3) + testScheduler.advanceTimeBy(200L) + emit(4) + } + val timeoutMillis = 100L + val maxItems = 5 + + // When + val result = flow.timedBuffer(timeoutMillis, maxItems).toList() + + // Then + assertEquals(2, result.size) + assertEquals(listOf(1, 2, 3), result[0]) + assertEquals(listOf(4), result[1]) + } +} \ No newline at end of file diff --git a/libsession/src/main/java/org/session/libsession/database/StorageProtocol.kt b/libsession/src/main/java/org/session/libsession/database/StorageProtocol.kt index 260e254fe7..c984594bcc 100644 --- a/libsession/src/main/java/org/session/libsession/database/StorageProtocol.kt +++ b/libsession/src/main/java/org/session/libsession/database/StorageProtocol.kt @@ -53,7 +53,7 @@ interface StorageProtocol { fun persistJob(job: Job) fun markJobAsSucceeded(jobId: String) fun markJobAsFailedPermanently(jobId: String) - fun getAllPendingJobs(type: String): Map + fun getAllPendingJobs(vararg types: String): Map fun getAttachmentUploadJob(attachmentID: Long): AttachmentUploadJob? fun getMessageSendJob(messageSendJobID: String): MessageSendJob? fun getMessageReceiveJob(messageReceiveJobID: String): Job? diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/AttachmentDownloadJob.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/AttachmentDownloadJob.kt index 6aae0c6c9b..ffa05bf1e6 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/AttachmentDownloadJob.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/AttachmentDownloadJob.kt @@ -1,6 +1,8 @@ package org.session.libsession.messaging.jobs import okhttp3.HttpUrl +import org.session.libsession.database.MessageDataProvider +import org.session.libsession.database.StorageProtocol import org.session.libsession.messaging.MessagingModuleConfiguration import org.session.libsession.messaging.open_groups.OpenGroupApi import org.session.libsession.messaging.sending_receiving.attachments.AttachmentId @@ -40,6 +42,36 @@ class AttachmentDownloadJob(val attachmentID: Long, val databaseMessageID: Long) // Keys used for database storage private val ATTACHMENT_ID_KEY = "attachment_id" private val TS_INCOMING_MESSAGE_ID_KEY = "tsIncoming_message_id" + + /** + * Check if the attachment in the given message is eligible for download. + * + * Note that this function only checks for the eligibility of the attachment in the sense + * of whether the download is allowed, it does not check if the download has already taken + * place. + */ + fun eligibleForDownload(threadID: Long, + storage: StorageProtocol, + messageDataProvider: MessageDataProvider, + databaseMessageID: Long): Boolean { + val threadRecipient = storage.getRecipientForThread(threadID) ?: return false + + // if we are the sender we are always eligible + val selfSend = messageDataProvider.isMmsOutgoing(databaseMessageID) + if (selfSend) { + return true + } + + // you can't be eligible without a sender + val sender = messageDataProvider.getIndividualRecipientForMms(databaseMessageID)?.address?.serialize() + ?: return false + + // you can't be eligible without a contact entry + val contact = storage.getContactWithSessionID(sender) ?: return false + + // we are eligible if we are receiving a group message or the contact is trusted + return threadRecipient.isGroupRecipient || contact.isTrusted + } } override suspend fun execute(dispatcherName: String) { @@ -88,21 +120,7 @@ class AttachmentDownloadJob(val attachmentID: Long, val databaseMessageID: Long) return } - val threadRecipient = storage.getRecipientForThread(threadID) - val selfSend = messageDataProvider.isMmsOutgoing(databaseMessageID) - val sender = if (selfSend) { - storage.getUserPublicKey() - } else { - messageDataProvider.getIndividualRecipientForMms(databaseMessageID)?.address?.serialize() - } - val contact = sender?.let { storage.getContactWithSessionID(it) } - if (threadRecipient == null || sender == null || (contact == null && !selfSend)) { - handleFailure(Error.NoSender, null) - return - } - if (!threadRecipient.isGroupRecipient && contact?.isTrusted != true && storage.getUserPublicKey() != sender) { - // if we aren't receiving a group message, a message from ourselves (self-send) and the contact sending is not trusted: - // do not continue, but do not fail + if (!eligibleForDownload(threadID, storage, messageDataProvider, databaseMessageID)) { handleFailure(Error.NoSender, null) return } diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt index 71d62bc72f..1450168ea9 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt @@ -102,7 +102,7 @@ class JobQueue : JobDelegate { execute(dispatcherName) } catch (e: Exception) { - Log.d(dispatcherName, "unhandledJobException: ${javaClass.simpleName} (id: $id)") + Log.d(dispatcherName, "unhandledJobException: ${javaClass.simpleName} (id: $id)", e) this@JobQueue.handleJobFailed(this, dispatcherName, e) } }