From 3faae5ddbee3852aa79ed7c05de598e4d9bae49d Mon Sep 17 00:00:00 2001 From: SessionHero01 <180888785+SessionHero01@users.noreply.github.com> Date: Wed, 2 Oct 2024 11:25:37 +1000 Subject: [PATCH] Getting rid of .get call on promise --- .../securesms/ApplicationContext.java | 2 + .../start/newmessage/NewMessageViewModel.kt | 15 +- .../v2/dialogs/JoinOpenGroupDialog.kt | 19 +- .../securesms/database/MmsDatabase.kt | 8 +- .../securesms/dependencies/ConfigFactory.kt | 42 ++-- .../securesms/home/HomeActivity.kt | 18 +- .../notifications/MarkReadReceiver.kt | 16 +- .../OptimizedMessageNotifier.java | 7 +- .../thoughtcrime/securesms/util/IP2Country.kt | 4 +- libsession-util/src/main/cpp/conversation.h | 1 - .../messaging/configs/ConfigSyncHandler.kt | 30 ++- .../messaging/jobs/AttachmentUploadJob.kt | 5 +- .../libsession/messaging/jobs/JobQueue.kt | 20 +- .../messaging/jobs/MessageReceiveJob.kt | 3 +- .../messaging/open_groups/OpenGroupApi.kt | 14 +- .../MessageSenderClosedGroupHandler.kt | 17 +- .../pollers/LegacyClosedGroupPollerV2.kt | 9 +- .../pollers/OpenGroupPoller.kt | 2 +- .../sending_receiving/pollers/Poller.kt | 203 +++++++++--------- .../libsession/snode/OnionRequestAPI.kt | 31 ++- .../snode/OnionRequestEncryption.kt | 90 ++++---- .../org/session/libsession/snode/SnodeAPI.kt | 48 +++-- .../libsession/snode/utilities/PromiseUtil.kt | 19 +- .../libsession/utilities/DownloadUtilities.kt | 14 +- .../org/session/libsignal/utilities/HTTP.kt | 21 +- .../libsignal/utilities/PromiseUtilities.kt | 39 +--- .../libsignal/utilities/ThreadUtils.kt | 10 +- 27 files changed, 363 insertions(+), 344 deletions(-) diff --git a/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java b/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java index a57fa7dbc3..dcb2e8b08f 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java +++ b/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java @@ -115,6 +115,7 @@ import javax.inject.Inject; import dagger.hilt.EntryPoints; import dagger.hilt.android.HiltAndroidApp; +import kotlin.Unit; import network.loki.messenger.BuildConfig; import network.loki.messenger.R; @@ -307,6 +308,7 @@ public class ApplicationContext extends Application implements DefaultLifecycleO startPollingIfNeeded(); OpenGroupManager.INSTANCE.startPolling(); + return Unit.INSTANCE; }); // fetch last version data diff --git a/app/src/main/java/org/thoughtcrime/securesms/conversation/start/newmessage/NewMessageViewModel.kt b/app/src/main/java/org/thoughtcrime/securesms/conversation/start/newmessage/NewMessageViewModel.kt index 0eea896a06..c6bfa91c51 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/conversation/start/newmessage/NewMessageViewModel.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/conversation/start/newmessage/NewMessageViewModel.kt @@ -6,7 +6,6 @@ import androidx.lifecycle.viewModelScope import dagger.hilt.android.lifecycle.HiltViewModel import java.util.concurrent.TimeoutException import javax.inject.Inject -import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Job import kotlinx.coroutines.channels.BufferOverflow import kotlinx.coroutines.flow.MutableSharedFlow @@ -14,12 +13,12 @@ import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.asSharedFlow import kotlinx.coroutines.flow.asStateFlow import kotlinx.coroutines.flow.update -import kotlinx.coroutines.isActive import kotlinx.coroutines.launch +import kotlinx.coroutines.withTimeout import network.loki.messenger.R import org.session.libsession.snode.SnodeAPI +import org.session.libsession.snode.utilities.await import org.session.libsignal.utilities.PublicKeyValidation -import org.session.libsignal.utilities.timeout import org.thoughtcrime.securesms.ui.GetString @HiltViewModel @@ -68,12 +67,14 @@ internal class NewMessageViewModel @Inject constructor( // This could be an ONS name _state.update { it.copy(isTextErrorColor = false, error = null, loading = true) } - loadOnsJob = viewModelScope.launch(Dispatchers.IO) { + loadOnsJob = viewModelScope.launch { try { - val publicKey = SnodeAPI.getAccountID(ons).timeout(30_000).get() - if (isActive) onPublicKey(publicKey) + val publicKey = withTimeout(30_000L, { + SnodeAPI.getAccountID(ons).await() + }) + onPublicKey(publicKey) } catch (e: Exception) { - if (isActive) onError(e) + onError(e) } } } diff --git a/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/dialogs/JoinOpenGroupDialog.kt b/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/dialogs/JoinOpenGroupDialog.kt index 4dce23d177..f2bd9fd222 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/dialogs/JoinOpenGroupDialog.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/dialogs/JoinOpenGroupDialog.kt @@ -9,7 +9,11 @@ import android.text.SpannableStringBuilder import android.text.style.StyleSpan import android.widget.Toast import androidx.fragment.app.DialogFragment +import androidx.lifecycle.lifecycleScope import com.squareup.phrase.Phrase +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.launch +import kotlinx.coroutines.withContext import network.loki.messenger.R import org.session.libsession.database.StorageProtocol import org.session.libsession.utilities.OpenGroupUrlParser @@ -43,14 +47,23 @@ class JoinOpenGroupDialog(private val name: String, private val url: String) : D private fun join() { val openGroup = OpenGroupUrlParser.parseUrl(url) val activity = requireActivity() - ThreadUtils.queue { + lifecycleScope.launch { try { - openGroup.apply { OpenGroupManager.add(server, room, serverPublicKey, activity) } - storage.onOpenGroupAdded(openGroup.server, openGroup.room) + withContext(Dispatchers.Default) { + OpenGroupManager.add( + server = openGroup.server, + room = openGroup.room, + publicKey = openGroup.serverPublicKey, + context = activity + ) + + storage.onOpenGroupAdded(openGroup.server, openGroup.room) + } } catch (e: Exception) { Toast.makeText(activity, R.string.communityErrorDescription, Toast.LENGTH_SHORT).show() } } + dismiss() } } diff --git a/app/src/main/java/org/thoughtcrime/securesms/database/MmsDatabase.kt b/app/src/main/java/org/thoughtcrime/securesms/database/MmsDatabase.kt index 984c050792..fa32a9561e 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/database/MmsDatabase.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/database/MmsDatabase.kt @@ -328,7 +328,7 @@ class MmsDatabase(context: Context, databaseHelper: SQLCipherOpenHelper) : Messa contentValues.put(HAS_MENTION, 0) database.update(TABLE_NAME, contentValues, ID_WHERE, arrayOf(messageId.toString())) val attachmentDatabase = get(context).attachmentDatabase() - queue(Runnable { attachmentDatabase.deleteAttachmentsForMessage(messageId) }) + queue { attachmentDatabase.deleteAttachmentsForMessage(messageId) } val threadId = getThreadIdForMessage(messageId) markAs(messageId, MmsSmsColumns.Types.BASE_DELETED_TYPE, threadId) @@ -889,7 +889,7 @@ class MmsDatabase(context: Context, databaseHelper: SQLCipherOpenHelper) : Messa } val idsAsString = queryBuilder.toString() val attachmentDatabase = get(context).attachmentDatabase() - queue(Runnable { attachmentDatabase.deleteAttachmentsForMessages(messageIds) }) + queue { attachmentDatabase.deleteAttachmentsForMessages(messageIds) } val groupReceiptDatabase = get(context).groupReceiptDatabase() groupReceiptDatabase.deleteRowsForMessages(messageIds) val database = databaseHelper.writableDatabase @@ -906,7 +906,7 @@ class MmsDatabase(context: Context, databaseHelper: SQLCipherOpenHelper) : Messa override fun deleteMessage(messageId: Long): Boolean { val threadId = getThreadIdForMessage(messageId) val attachmentDatabase = get(context).attachmentDatabase() - queue(Runnable { attachmentDatabase.deleteAttachmentsForMessage(messageId) }) + queue { attachmentDatabase.deleteAttachmentsForMessage(messageId) } val groupReceiptDatabase = get(context).groupReceiptDatabase() groupReceiptDatabase.deleteRowsForMessage(messageId) val database = databaseHelper.writableDatabase @@ -925,7 +925,7 @@ class MmsDatabase(context: Context, databaseHelper: SQLCipherOpenHelper) : Messa val attachmentDatabase = get(context).attachmentDatabase() val groupReceiptDatabase = get(context).groupReceiptDatabase() - queue(Runnable { attachmentDatabase.deleteAttachmentsForMessages(messageIds) }) + queue { attachmentDatabase.deleteAttachmentsForMessages(messageIds) } groupReceiptDatabase.deleteRowsForMessages(messageIds) val db = databaseHelper.writableDatabase diff --git a/app/src/main/java/org/thoughtcrime/securesms/dependencies/ConfigFactory.kt b/app/src/main/java/org/thoughtcrime/securesms/dependencies/ConfigFactory.kt index f8bec9bccb..26a3a629d3 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/dependencies/ConfigFactory.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/dependencies/ConfigFactory.kt @@ -222,7 +222,7 @@ class ConfigFactory @Inject constructor( private val groupConfigs = ConcurrentHashMap() private val _configUpdateNotifications = MutableSharedFlow( - extraBufferCapacity = 1, + extraBufferCapacity = 5, // The notifications are normally important so we can afford to buffer a few onBufferOverflow = BufferOverflow.SUSPEND ) override val configUpdateNotifications get() = _configUpdateNotifications @@ -260,15 +260,15 @@ class ConfigFactory @Inject constructor( * @param cb A function that takes a [UserConfigsImpl] and returns a pair of the result of the operation and a boolean indicating if the configs were changed. */ private fun doWithMutableUserConfigs(cb: (UserConfigsImpl) -> Pair): T { - return withUserConfigs { configs -> - val (result, changed) = cb(configs as UserConfigsImpl) - - if (changed) { - _configUpdateNotifications.tryEmit(ConfigUpdateNotification.UserConfigs) - } - - result + val (result, changed) = withUserConfigs { configs -> + cb(configs as UserConfigsImpl) } + + if (changed) { + _configUpdateNotifications.tryEmit(ConfigUpdateNotification.UserConfigs) + } + + return result } override fun mergeUserConfigs( @@ -319,19 +319,19 @@ class ConfigFactory @Inject constructor( } private fun doWithMutableGroupConfigs(groupId: AccountId, cb: (GroupConfigsImpl) -> Pair): T { - return withGroupConfigs(groupId) { configs -> - val (result, changed) = cb(configs as GroupConfigsImpl) - - Log.d("ConfigFactory", "Group updated? $groupId: $changed") - - if (changed) { - if (!_configUpdateNotifications.tryEmit(ConfigUpdateNotification.GroupConfigsUpdated(groupId))) { - Log.e("ConfigFactory", "Unable to deliver group update notification") - } - } - - result + val (result, changed) = withGroupConfigs(groupId) { configs -> + cb(configs as GroupConfigsImpl) } + + Log.d("ConfigFactory", "Group updated? $groupId: $changed") + + if (changed) { + if (!_configUpdateNotifications.tryEmit(ConfigUpdateNotification.GroupConfigsUpdated(groupId))) { + Log.e("ConfigFactory", "Unable to deliver group update notification") + } + } + + return result } override fun withMutableGroupConfigs( diff --git a/app/src/main/java/org/thoughtcrime/securesms/home/HomeActivity.kt b/app/src/main/java/org/thoughtcrime/securesms/home/HomeActivity.kt index 93be4d7d90..d49f0f8a49 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/home/HomeActivity.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/home/HomeActivity.kt @@ -240,7 +240,7 @@ class HomeActivity : PassphraseRequiredActionBarActivity(), } lifecycleScope.launchWhenStarted { - launch(Dispatchers.IO) { + launch(Dispatchers.Default) { // Double check that the long poller is up (applicationContext as ApplicationContext).startPollingIfNeeded() // update things based on TextSecurePrefs (profile info etc) @@ -516,7 +516,7 @@ class HomeActivity : PassphraseRequiredActionBarActivity(), .put(NAME_KEY, thread.recipient.name) .format()) dangerButton(R.string.block, R.string.AccessibilityId_blockConfirm) { - lifecycleScope.launch(Dispatchers.IO) { + lifecycleScope.launch(Dispatchers.Default) { storage.setBlocked(listOf(thread.recipient), true) withContext(Dispatchers.Main) { @@ -536,7 +536,7 @@ class HomeActivity : PassphraseRequiredActionBarActivity(), title(R.string.blockUnblock) text(Phrase.from(context, R.string.blockUnblockName).put(NAME_KEY, thread.recipient.name).format()) dangerButton(R.string.blockUnblock, R.string.AccessibilityId_unblockConfirm) { - lifecycleScope.launch(Dispatchers.IO) { + lifecycleScope.launch(Dispatchers.Default) { storage.setBlocked(listOf(thread.recipient), false) withContext(Dispatchers.Main) { binding.recyclerView.adapter!!.notifyDataSetChanged() @@ -549,7 +549,7 @@ class HomeActivity : PassphraseRequiredActionBarActivity(), private fun setConversationMuted(thread: ThreadRecord, isMuted: Boolean) { if (!isMuted) { - lifecycleScope.launch(Dispatchers.IO) { + lifecycleScope.launch(Dispatchers.Default) { recipientDatabase.setMuted(thread.recipient, 0) withContext(Dispatchers.Main) { binding.recyclerView.adapter!!.notifyDataSetChanged() @@ -557,7 +557,7 @@ class HomeActivity : PassphraseRequiredActionBarActivity(), } } else { showMuteDialog(this) { until -> - lifecycleScope.launch(Dispatchers.IO) { + lifecycleScope.launch(Dispatchers.Default) { Log.d("", "**** until: $until") recipientDatabase.setMuted(thread.recipient, until) withContext(Dispatchers.Main) { @@ -569,7 +569,7 @@ class HomeActivity : PassphraseRequiredActionBarActivity(), } private fun setNotifyType(thread: ThreadRecord, newNotifyType: Int) { - lifecycleScope.launch(Dispatchers.IO) { + lifecycleScope.launch(Dispatchers.Default) { recipientDatabase.setNotifyType(thread.recipient, newNotifyType) withContext(Dispatchers.Main) { binding.recyclerView.adapter!!.notifyDataSetChanged() @@ -578,14 +578,14 @@ class HomeActivity : PassphraseRequiredActionBarActivity(), } private fun setConversationPinned(threadId: Long, pinned: Boolean) { - lifecycleScope.launch(Dispatchers.IO) { + lifecycleScope.launch(Dispatchers.Default) { storage.setPinned(threadId, pinned) homeViewModel.tryReload() } } private fun markAllAsRead(thread: ThreadRecord) { - ThreadUtils.queue { + lifecycleScope.launch(Dispatchers.Default) { MessagingModuleConfiguration.shared.storage.markConversationAsRead(thread.threadId, SnodeAPI.nowWithOffset) } } @@ -656,7 +656,7 @@ class HomeActivity : PassphraseRequiredActionBarActivity(), context ) } else { - lifecycleScope.launch(Dispatchers.IO) { + lifecycleScope.launch(Dispatchers.Default) { threadDb.deleteConversation(threadID) } } diff --git a/app/src/main/java/org/thoughtcrime/securesms/notifications/MarkReadReceiver.kt b/app/src/main/java/org/thoughtcrime/securesms/notifications/MarkReadReceiver.kt index 505e8f0ec3..223434a92f 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/notifications/MarkReadReceiver.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/notifications/MarkReadReceiver.kt @@ -6,12 +6,15 @@ import android.content.Context import android.content.Intent import android.os.AsyncTask import androidx.core.app.NotificationManagerCompat +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.launch import org.session.libsession.database.userAuth import org.session.libsession.messaging.MessagingModuleConfiguration.Companion.shared import org.session.libsession.messaging.messages.control.ReadReceipt import org.session.libsession.messaging.sending_receiving.MessageSender.send import org.session.libsession.snode.SnodeAPI import org.session.libsession.snode.SnodeAPI.nowWithOffset +import org.session.libsession.snode.utilities.await import org.session.libsession.utilities.SSKEnvironment import org.session.libsession.utilities.TextSecurePreferences.Companion.isReadReceiptsEnabled import org.session.libsession.utilities.associateByNotNull @@ -72,9 +75,11 @@ class MarkReadReceiver : BroadcastReceiver() { } .forEach { messageExpirationManager.startDisappearAfterRead(it.timetamp, it.address.serialize()) } - hashToDisappearAfterReadMessage(context, markedReadMessages)?.let { - fetchUpdatedExpiriesAndScheduleDeletion(context, it) - shortenExpiryOfDisappearingAfterRead(context, it) + hashToDisappearAfterReadMessage(context, markedReadMessages)?.let { hashToMessages -> + GlobalScope.launch { + fetchUpdatedExpiriesAndScheduleDeletion(context, hashToMessages) + shortenExpiryOfDisappearingAfterRead(hashToMessages) + } } } @@ -91,7 +96,6 @@ class MarkReadReceiver : BroadcastReceiver() { } private fun shortenExpiryOfDisappearingAfterRead( - context: Context, hashToMessage: Map ) { hashToMessage.entries @@ -125,12 +129,12 @@ class MarkReadReceiver : BroadcastReceiver() { } } - private fun fetchUpdatedExpiriesAndScheduleDeletion( + private suspend fun fetchUpdatedExpiriesAndScheduleDeletion( context: Context, hashToMessage: Map ) { @Suppress("UNCHECKED_CAST") - val expiries = SnodeAPI.getExpiries(hashToMessage.keys.toList(), shared.storage.userAuth!!).get()["expiries"] as Map + val expiries = SnodeAPI.getExpiries(hashToMessage.keys.toList(), shared.storage.userAuth!!).await()["expiries"] as Map hashToMessage.forEach { (hash, info) -> expiries[hash]?.let { scheduleDeletion(context, info.expirationInfo, it - info.expirationInfo.expireStarted) } } } diff --git a/app/src/main/java/org/thoughtcrime/securesms/notifications/OptimizedMessageNotifier.java b/app/src/main/java/org/thoughtcrime/securesms/notifications/OptimizedMessageNotifier.java index 5ca700cf66..8bcf43887a 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/notifications/OptimizedMessageNotifier.java +++ b/app/src/main/java/org/thoughtcrime/securesms/notifications/OptimizedMessageNotifier.java @@ -16,6 +16,8 @@ import org.thoughtcrime.securesms.groups.OpenGroupManager; import java.util.concurrent.TimeUnit; +import kotlin.Unit; + public class OptimizedMessageNotifier implements MessageNotifier { private final MessageNotifier wrapped; private final Debouncer debouncer; @@ -118,7 +120,10 @@ public class OptimizedMessageNotifier implements MessageNotifier { private void performOnBackgroundThreadIfNeeded(Runnable r) { if (Looper.myLooper() == Looper.getMainLooper()) { - ThreadUtils.queue(r); + ThreadUtils.queue(() -> { + r.run(); + return Unit.INSTANCE; + }); } else { r.run(); } diff --git a/app/src/main/java/org/thoughtcrime/securesms/util/IP2Country.kt b/app/src/main/java/org/thoughtcrime/securesms/util/IP2Country.kt index 446f0286c5..ccf5914b09 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/util/IP2Country.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/util/IP2Country.kt @@ -6,6 +6,8 @@ import android.content.Intent import android.content.IntentFilter import androidx.localbroadcastmanager.content.LocalBroadcastManager import com.opencsv.CSVReader +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.launch import org.session.libsession.snode.OnionRequestAPI import org.session.libsignal.utilities.Log import org.session.libsignal.utilities.ThreadUtils @@ -133,7 +135,7 @@ class IP2Country private constructor(private val context: Context) { } private fun populateCacheIfNeeded() { - ThreadUtils.queue { + GlobalScope.launch { OnionRequestAPI.paths.iterator().forEach { path -> path.iterator().forEach { snode -> cacheCountryForIP(snode.ip) // Preload if needed diff --git a/libsession-util/src/main/cpp/conversation.h b/libsession-util/src/main/cpp/conversation.h index 61a3ebd462..1a8b51e2bc 100644 --- a/libsession-util/src/main/cpp/conversation.h +++ b/libsession-util/src/main/cpp/conversation.h @@ -53,7 +53,6 @@ inline jobject serialize_closed_group(JNIEnv* env, session::config::convo::group } inline jobject serialize_any(JNIEnv *env, session::config::convo::any any) { - __android_log_print(ANDROID_LOG_WARN, "DESERIALIE", "deserializing any"); if (auto* dm = std::get_if(&any)) { return serialize_one_to_one(env, *dm); } else if (auto* og = std::get_if(&any)) { diff --git a/libsession/src/main/java/org/session/libsession/messaging/configs/ConfigSyncHandler.kt b/libsession/src/main/java/org/session/libsession/messaging/configs/ConfigSyncHandler.kt index 379a14d305..78c3be487b 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/configs/ConfigSyncHandler.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/configs/ConfigSyncHandler.kt @@ -6,10 +6,14 @@ import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.Job +import kotlinx.coroutines.asCoroutineDispatcher import kotlinx.coroutines.async import kotlinx.coroutines.awaitAll import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.flow.debounce import kotlinx.coroutines.launch +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock import network.loki.messenger.libsession_util.util.ConfigPush import org.session.libsession.database.StorageProtocol import org.session.libsession.database.userAuth @@ -29,6 +33,7 @@ import org.session.libsignal.utilities.Base64 import org.session.libsignal.utilities.Log import org.session.libsignal.utilities.Namespace import org.session.libsignal.utilities.Snode +import java.util.concurrent.Executors import javax.inject.Inject private const val TAG = "ConfigSyncHandler" @@ -44,32 +49,35 @@ class ConfigSyncHandler @Inject constructor( ) { private var job: Job? = null - @OptIn(ExperimentalCoroutinesApi::class, DelicateCoroutinesApi::class) + @OptIn(DelicateCoroutinesApi::class) fun start() { require(job == null) { "Already started" } job = GlobalScope.launch { - val groupDispatchers = hashMapOf() - val userConfigDispatcher = Dispatchers.Default.limitedParallelism(1) + val groupMutex = hashMapOf() + val userMutex = Mutex() - configFactory.configUpdateNotifications.collect { changes -> + configFactory.configUpdateNotifications + .collect { changes -> try { when (changes) { is ConfigUpdateNotification.GroupConfigsDeleted -> { - groupDispatchers.remove(changes.groupId) + groupMutex.remove(changes.groupId) } is ConfigUpdateNotification.GroupConfigsUpdated -> { // Group config pushing is limited to its own dispatcher - launch(groupDispatchers.getOrPut(changes.groupId) { - Dispatchers.Default.limitedParallelism(1) - }) { - pushGroupConfigsChangesIfNeeded(changes.groupId) + launch { + groupMutex.getOrPut(changes.groupId) { Mutex() }.withLock { + pushGroupConfigsChangesIfNeeded(changes.groupId) + } } } - ConfigUpdateNotification.UserConfigs -> launch(userConfigDispatcher) { - pushUserConfigChangesIfNeeded() + ConfigUpdateNotification.UserConfigs -> launch { + userMutex.withLock { + pushUserConfigChangesIfNeeded() + } } } } catch (e: Exception) { diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/AttachmentUploadJob.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/AttachmentUploadJob.kt index 1e6d483603..8a91a854c9 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/AttachmentUploadJob.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/AttachmentUploadJob.kt @@ -12,6 +12,7 @@ import org.session.libsession.messaging.messages.Message import org.session.libsession.messaging.open_groups.OpenGroupApi import org.session.libsession.messaging.sending_receiving.MessageSender import org.session.libsession.messaging.utilities.Data +import org.session.libsession.snode.utilities.await import org.session.libsession.utilities.DecodedAudio import org.session.libsession.utilities.InputStreamMediaDataSource import org.session.libsession.utilities.UploadResult @@ -76,7 +77,7 @@ class AttachmentUploadJob(val attachmentID: Long, val threadID: String, val mess } } - private fun upload(attachment: SignalServiceAttachmentStream, server: String, encrypt: Boolean, upload: (ByteArray) -> Promise): Pair { + private suspend fun upload(attachment: SignalServiceAttachmentStream, server: String, encrypt: Boolean, upload: (ByteArray) -> Promise): Pair { // Key val key = if (encrypt) Util.getSecretBytes(64) else ByteArray(0) // Length @@ -102,7 +103,7 @@ class AttachmentUploadJob(val attachmentID: Long, val threadID: String, val mess drb.writeTo(b) val data = b.readByteArray() // Upload the data - val id = upload(data).get() + val id = upload(data).await() val digest = drb.transmittedDigest // Return return Pair(key, UploadResult(id, "${server}/file/$id", digest)) diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt index bbfca36102..199523c3dd 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt @@ -24,7 +24,7 @@ class JobQueue : JobDelegate { private var hasResumedPendingJobs = false // Just for debugging private val jobTimestampMap = ConcurrentHashMap() - private val scope = GlobalScope + private val scope: CoroutineScope = GlobalScope private val queue = Channel(UNLIMITED) private val pendingJobIds = mutableSetOf() @@ -34,9 +34,8 @@ class JobQueue : JobDelegate { private fun CoroutineScope.processWithOpenGroupDispatcher( channel: Channel, - dispatcher: CoroutineDispatcher, name: String - ) = launch(dispatcher) { + ) = launch { for (job in channel) { if (!isActive) break val openGroupId = when (job) { @@ -54,7 +53,7 @@ class JobQueue : JobDelegate { val groupChannel = if (!openGroupChannels.containsKey(openGroupId)) { Log.d("OpenGroupDispatcher", "Creating ${openGroupId.hashCode()} channel") val newGroupChannel = Channel(UNLIMITED) - launch(dispatcher) { + launch { for (groupJob in newGroupChannel) { if (!isActive) break groupJob.process(name) @@ -74,14 +73,13 @@ class JobQueue : JobDelegate { private fun CoroutineScope.processWithDispatcher( channel: Channel, - dispatcher: CoroutineDispatcher, name: String, asynchronous: Boolean = true - ) = launch(dispatcher) { + ) = launch { for (job in channel) { if (!isActive) break if (asynchronous) { - launch(dispatcher) { + launch { job.process(name) } } else { @@ -111,10 +109,10 @@ class JobQueue : JobDelegate { val mediaQueue = Channel(capacity = UNLIMITED) val openGroupQueue = Channel(capacity = UNLIMITED) - val receiveJob = processWithDispatcher(rxQueue, Dispatchers.Default.limitedParallelism(1), "rx", asynchronous = false) - val txJob = processWithDispatcher(txQueue, Dispatchers.Default.limitedParallelism(1), "tx") - val mediaJob = processWithDispatcher(mediaQueue, Dispatchers.Default.limitedParallelism(4), "media") - val openGroupJob = processWithOpenGroupDispatcher(openGroupQueue, Dispatchers.Default.limitedParallelism(8), "openGroup") + val receiveJob = processWithDispatcher(rxQueue, "rx", asynchronous = false) + val txJob = processWithDispatcher(txQueue, "tx") + val mediaJob = processWithDispatcher(mediaQueue, "media") + val openGroupJob = processWithOpenGroupDispatcher(openGroupQueue, "openGroup") while (isActive) { when (val job = queue.receive()) { diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageReceiveJob.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageReceiveJob.kt index 0897cd06ce..6648d84518 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageReceiveJob.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageReceiveJob.kt @@ -7,6 +7,7 @@ import org.session.libsession.messaging.messages.Message import org.session.libsession.messaging.sending_receiving.MessageReceiver import org.session.libsession.messaging.sending_receiving.handle import org.session.libsession.messaging.utilities.Data +import org.session.libsession.snode.utilities.await import org.session.libsignal.utilities.Log class MessageReceiveJob(val data: ByteArray, val serverHash: String? = null, val openGroupMessageServerID: Long? = null, val openGroupID: String? = null) : Job { @@ -27,7 +28,7 @@ class MessageReceiveJob(val data: ByteArray, val serverHash: String? = null, val } override suspend fun execute(dispatcherName: String) { - executeAsync(dispatcherName).get() + executeAsync(dispatcherName).await() } fun executeAsync(dispatcherName: String): Promise { diff --git a/libsession/src/main/java/org/session/libsession/messaging/open_groups/OpenGroupApi.kt b/libsession/src/main/java/org/session/libsession/messaging/open_groups/OpenGroupApi.kt index 1421aac18f..5e93a01018 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/open_groups/OpenGroupApi.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/open_groups/OpenGroupApi.kt @@ -8,7 +8,9 @@ import com.fasterxml.jackson.databind.annotation.JsonNaming import com.fasterxml.jackson.databind.type.TypeFactory import com.goterl.lazysodium.interfaces.GenericHash import com.goterl.lazysodium.interfaces.Sign +import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.launch import nl.komponents.kovenant.Promise import nl.komponents.kovenant.functional.map import okhttp3.Headers.Companion.toHeaders @@ -22,6 +24,8 @@ import org.session.libsession.messaging.utilities.SodiumUtilities.sodium import org.session.libsession.snode.OnionRequestAPI import org.session.libsession.snode.OnionResponse import org.session.libsession.snode.SnodeAPI +import org.session.libsession.snode.utilities.asyncPromise +import org.session.libsession.snode.utilities.await import org.session.libsession.utilities.TextSecurePreferences import org.session.libsignal.utilities.AccountId import org.session.libsignal.utilities.Base64.decode @@ -858,7 +862,9 @@ object OpenGroupApi { } fun getDefaultRoomsIfNeeded(): Promise, Exception> { - return getAllRooms().map { groups -> + return GlobalScope.asyncPromise { + val groups = getAllRooms().await() + val earlyGroups = groups.map { group -> DefaultGroup(group.token, group.name, null) } @@ -873,15 +879,13 @@ object OpenGroupApi { } groups.map { group -> val image = try { - images[group.token]!!.get() + images[group.token]!!.await() } catch (e: Exception) { // No image or image failed to download null } DefaultGroup(group.token, group.name, image) - } - }.success { new -> - defaultRooms.tryEmit(new) + }.also(defaultRooms::tryEmit) } } diff --git a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/MessageSenderClosedGroupHandler.kt b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/MessageSenderClosedGroupHandler.kt index b7da3e892b..227babae86 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/MessageSenderClosedGroupHandler.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/MessageSenderClosedGroupHandler.kt @@ -3,6 +3,7 @@ package org.session.libsession.messaging.sending_receiving import com.google.protobuf.ByteString +import kotlinx.coroutines.GlobalScope import nl.komponents.kovenant.Promise import nl.komponents.kovenant.deferred import org.session.libsession.messaging.MessagingModuleConfiguration @@ -14,6 +15,8 @@ import org.session.libsession.messaging.sending_receiving.MessageSender.Error import org.session.libsession.messaging.sending_receiving.notifications.PushRegistryV1 import org.session.libsession.messaging.sending_receiving.pollers.LegacyClosedGroupPollerV2 import org.session.libsession.snode.SnodeAPI +import org.session.libsession.snode.utilities.asyncPromise +import org.session.libsession.snode.utilities.await import org.session.libsession.utilities.Address import org.session.libsession.utilities.Address.Companion.fromSerialized import org.session.libsession.utilities.Device @@ -41,10 +44,8 @@ fun MessageSender.create( name: String, members: Collection ): Promise { - val deferred = deferred() - ThreadUtils.queue { + return GlobalScope.asyncPromise { // Prepare - val context = MessagingModuleConfiguration.shared.context val storage = MessagingModuleConfiguration.shared.storage val userPublicKey = storage.getUserPublicKey()!! val membersAsData = members.map { ByteString.copyFrom(Hex.fromStringCondensed(it)) } @@ -83,7 +84,7 @@ fun MessageSender.create( val closedGroupControlMessage = ClosedGroupControlMessage(closedGroupUpdateKind, groupID) closedGroupControlMessage.sentTimestamp = sentTime try { - sendNonDurably(closedGroupControlMessage, Address.fromSerialized(member), member == ourPubKey).get() + sendNonDurably(closedGroupControlMessage, Address.fromSerialized(member), member == ourPubKey).await() } catch (e: Exception) { // We failed to properly create the group so delete it's associated data (in the past // we didn't create this data until the messages successfully sent but this resulted @@ -91,8 +92,7 @@ fun MessageSender.create( storage.removeClosedGroupPublicKey(groupPublicKey) storage.removeAllClosedGroupEncryptionKeyPairs(groupPublicKey) storage.deleteConversation(threadID) - deferred.reject(e) - return@queue + throw e } } @@ -102,11 +102,8 @@ fun MessageSender.create( PushRegistryV1.register(device = device, publicKey = userPublicKey) // Start polling LegacyClosedGroupPollerV2.shared.startPolling(groupPublicKey) - // Fulfill the promise - deferred.resolve(groupID) + groupID } - // Return - return deferred.promise } fun MessageSender.setName(groupPublicKey: String, newName: String) { diff --git a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/LegacyClosedGroupPollerV2.kt b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/LegacyClosedGroupPollerV2.kt index 3e1883f3a3..aae2b6f3ab 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/LegacyClosedGroupPollerV2.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/LegacyClosedGroupPollerV2.kt @@ -1,5 +1,6 @@ package org.session.libsession.messaging.sending_receiving.pollers +import kotlinx.coroutines.GlobalScope import nl.komponents.kovenant.Promise import nl.komponents.kovenant.functional.bind import nl.komponents.kovenant.functional.map @@ -9,6 +10,8 @@ import org.session.libsession.messaging.jobs.BatchMessageReceiveJob import org.session.libsession.messaging.jobs.JobQueue import org.session.libsession.messaging.jobs.MessageReceiveParameters import org.session.libsession.snode.SnodeAPI +import org.session.libsession.snode.utilities.asyncPromise +import org.session.libsession.snode.utilities.await import org.session.libsession.utilities.GroupUtil import org.session.libsignal.crypto.secureRandomOrNull import org.session.libsignal.utilities.Log @@ -110,13 +113,13 @@ class LegacyClosedGroupPollerV2 { when { currentForkInfo.defaultRequiresAuth() -> SnodeAPI.getUnauthenticatedRawMessages(snode, groupPublicKey, namespace = Namespace.UNAUTHENTICATED_CLOSED_GROUP()) .map { SnodeAPI.parseRawMessagesResponse(it, snode, groupPublicKey, Namespace.UNAUTHENTICATED_CLOSED_GROUP()) } - currentForkInfo.hasNamespaces() -> task { + currentForkInfo.hasNamespaces() -> GlobalScope.asyncPromise { val unAuthed = SnodeAPI.getUnauthenticatedRawMessages(snode, groupPublicKey, namespace = Namespace.UNAUTHENTICATED_CLOSED_GROUP()) .map { SnodeAPI.parseRawMessagesResponse(it, snode, groupPublicKey, Namespace.UNAUTHENTICATED_CLOSED_GROUP()) } val default = SnodeAPI.getUnauthenticatedRawMessages(snode, groupPublicKey, namespace = Namespace.DEFAULT()) .map { SnodeAPI.parseRawMessagesResponse(it, snode, groupPublicKey, Namespace.DEFAULT()) } - val unAuthedResult = unAuthed.get() - val defaultResult = default.get() + val unAuthedResult = unAuthed.await() + val defaultResult = default.await() val format = DateFormat.getTimeInstance() if (unAuthedResult.isNotEmpty() || defaultResult.isNotEmpty()) { Log.d("Poller", "@${format.format(Date())}Polled ${unAuthedResult.size} from -10, ${defaultResult.size} from 0") diff --git a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPoller.kt b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPoller.kt index 3b7db96802..e834caa756 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPoller.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPoller.kt @@ -25,12 +25,12 @@ import org.session.libsession.messaging.sending_receiving.MessageReceiver import org.session.libsession.messaging.sending_receiving.handle import org.session.libsession.messaging.sending_receiving.handleOpenGroupReactions import org.session.libsession.snode.OnionRequestAPI +import org.session.libsession.snode.utilities.successBackground import org.session.libsession.utilities.Address import org.session.libsession.utilities.GroupUtil import org.session.libsignal.protos.SignalServiceProtos import org.session.libsignal.utilities.Base64 import org.session.libsignal.utilities.Log -import org.session.libsignal.utilities.successBackground import java.util.UUID import java.util.concurrent.ScheduledExecutorService import java.util.concurrent.ScheduledFuture diff --git a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/Poller.kt b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/Poller.kt index 56357fd837..d55a805657 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/Poller.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/Poller.kt @@ -2,14 +2,12 @@ package org.session.libsession.messaging.sending_receiving.pollers import android.util.SparseArray import androidx.core.util.valueIterator -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.GlobalScope import nl.komponents.kovenant.Deferred import nl.komponents.kovenant.Promise import nl.komponents.kovenant.deferred import nl.komponents.kovenant.functional.bind import nl.komponents.kovenant.resolve -import nl.komponents.kovenant.task import org.session.libsession.database.StorageProtocol import org.session.libsession.database.userAuth import org.session.libsession.messaging.MessagingModuleConfiguration @@ -19,6 +17,7 @@ import org.session.libsession.messaging.jobs.MessageReceiveParameters import org.session.libsession.snode.RawResponse import org.session.libsession.snode.SnodeAPI import org.session.libsession.snode.SnodeModule +import org.session.libsession.snode.utilities.asyncPromise import org.session.libsession.utilities.ConfigFactoryProtocol import org.session.libsession.utilities.ConfigMessage import org.session.libsession.utilities.UserConfigType @@ -177,7 +176,7 @@ class Poller( return poll(snode, deferred) } - private fun pollUserProfile(snode: Snode, deferred: Deferred): Promise = task { + private fun pollUserProfile(snode: Snode, deferred: Deferred): Promise = GlobalScope.asyncPromise { val requests = mutableListOf() val hashesToExtend = mutableSetOf() val userAuth = requireNotNull(MessagingModuleConfiguration.shared.storage.userAuth) @@ -236,116 +235,114 @@ class Poller( private fun poll(snode: Snode, deferred: Deferred): Promise { if (!hasStarted) { return Promise.ofFail(PromiseCanceledException()) } - return task { - runBlocking(Dispatchers.IO) { - val userAuth = requireNotNull(MessagingModuleConfiguration.shared.storage.userAuth) - val requestSparseArray = SparseArray() - // get messages - SnodeAPI.buildAuthenticatedRetrieveBatchRequest( - lastHash = lokiApiDatabase.getLastMessageHashValue( - snode = snode, - publicKey = userAuth.accountId.hexString, - namespace = Namespace.DEFAULT() - ), - auth = userAuth, - maxSize = -2) - .also { personalMessages -> - // namespaces here should always be set - requestSparseArray[personalMessages.namespace!!] = personalMessages - } - // get the latest convo info volatile - val hashesToExtend = mutableSetOf() - configFactory.withUserConfigs { configs -> - UserConfigType - .entries - .map { type -> - val config = configs.getConfig(type) - hashesToExtend += config.currentHashes() - type.namespace to SnodeAPI.buildAuthenticatedRetrieveBatchRequest( - lastHash = lokiApiDatabase.getLastMessageHashValue( - snode = snode, - publicKey = userAuth.accountId.hexString, - namespace = type.namespace - ), - auth = userAuth, - namespace = type.namespace, - maxSize = -8 - ) - } - }.forEach { (namespace, request) -> - // namespaces here should always be set - requestSparseArray[namespace] = request - } - - val requests = - requestSparseArray.valueIterator().asSequence().toMutableList() - - if (hashesToExtend.isNotEmpty()) { - SnodeAPI.buildAuthenticatedAlterTtlBatchRequest( - messageHashes = hashesToExtend.toList(), - auth = userAuth, - newExpiry = SnodeAPI.nowWithOffset + 14.days.inWholeMilliseconds, - extend = true - ).let { extensionRequest -> - requests += extensionRequest + return GlobalScope.asyncPromise { + val userAuth = requireNotNull(MessagingModuleConfiguration.shared.storage.userAuth) + val requestSparseArray = SparseArray() + // get messages + SnodeAPI.buildAuthenticatedRetrieveBatchRequest( + lastHash = lokiApiDatabase.getLastMessageHashValue( + snode = snode, + publicKey = userAuth.accountId.hexString, + namespace = Namespace.DEFAULT() + ), + auth = userAuth, + maxSize = -2) + .also { personalMessages -> + // namespaces here should always be set + requestSparseArray[personalMessages.namespace!!] = personalMessages + } + // get the latest convo info volatile + val hashesToExtend = mutableSetOf() + configFactory.withUserConfigs { configs -> + UserConfigType + .entries + .map { type -> + val config = configs.getConfig(type) + hashesToExtend += config.currentHashes() + type.namespace to SnodeAPI.buildAuthenticatedRetrieveBatchRequest( + lastHash = lokiApiDatabase.getLastMessageHashValue( + snode = snode, + publicKey = userAuth.accountId.hexString, + namespace = type.namespace + ), + auth = userAuth, + namespace = type.namespace, + maxSize = -8 + ) } + }.forEach { (namespace, request) -> + // namespaces here should always be set + requestSparseArray[namespace] = request + } + + val requests = + requestSparseArray.valueIterator().asSequence().toMutableList() + + if (hashesToExtend.isNotEmpty()) { + SnodeAPI.buildAuthenticatedAlterTtlBatchRequest( + messageHashes = hashesToExtend.toList(), + auth = userAuth, + newExpiry = SnodeAPI.nowWithOffset + 14.days.inWholeMilliseconds, + extend = true + ).let { extensionRequest -> + requests += extensionRequest } + } - if (requests.isNotEmpty()) { - SnodeAPI.getRawBatchResponse(snode, userPublicKey, requests).bind { rawResponses -> - isCaughtUp = true - if (deferred.promise.isDone()) { - return@bind Promise.ofSuccess(Unit) - } else { - val responseList = (rawResponses["results"] as List) - // in case we had null configs, the array won't be fully populated - // index of the sparse array key iterator should be the request index, with the key being the namespace - UserConfigType.entries - .map { type -> type to requestSparseArray.indexOfKey(type.namespace) } - .filter { (_, i) -> i >= 0 } - .forEach { (configType, requestIndex) -> - responseList.getOrNull(requestIndex)?.let { rawResponse -> - if (rawResponse["code"] as? Int != 200) { - Log.e(TAG, "Batch sub-request had non-200 response code, returned code ${(rawResponse["code"] as? Int) ?: "[unknown]"}") - return@forEach - } - val body = rawResponse["body"] as? RawResponse - if (body == null) { - Log.e(TAG, "Batch sub-request didn't contain a body") - return@forEach - } - - processConfig(snode, body, configType) - } - } - - // the first response will be the personal messages (we want these to be processed after config messages) - val personalResponseIndex = requestSparseArray.indexOfKey(Namespace.DEFAULT()) - if (personalResponseIndex >= 0) { - responseList.getOrNull(personalResponseIndex)?.let { rawResponse -> + if (requests.isNotEmpty()) { + SnodeAPI.getRawBatchResponse(snode, userPublicKey, requests).bind { rawResponses -> + isCaughtUp = true + if (deferred.promise.isDone()) { + return@bind Promise.ofSuccess(Unit) + } else { + val responseList = (rawResponses["results"] as List) + // in case we had null configs, the array won't be fully populated + // index of the sparse array key iterator should be the request index, with the key being the namespace + UserConfigType.entries + .map { type -> type to requestSparseArray.indexOfKey(type.namespace) } + .filter { (_, i) -> i >= 0 } + .forEach { (configType, requestIndex) -> + responseList.getOrNull(requestIndex)?.let { rawResponse -> if (rawResponse["code"] as? Int != 200) { - Log.e(TAG, "Batch sub-request for personal messages had non-200 response code, returned code ${(rawResponse["code"] as? Int) ?: "[unknown]"}") - // If we got a non-success response then the snode might be bad so we should try rotate - // to a different one just in case - pollNextSnode(deferred = deferred) - return@bind Promise.ofSuccess(Unit) - } else { - val body = rawResponse["body"] as? RawResponse - if (body == null) { - Log.e(TAG, "Batch sub-request for personal messages didn't contain a body") - } else { - processPersonalMessages(snode, body) - } + Log.e(TAG, "Batch sub-request had non-200 response code, returned code ${(rawResponse["code"] as? Int) ?: "[unknown]"}") + return@forEach } + val body = rawResponse["body"] as? RawResponse + if (body == null) { + Log.e(TAG, "Batch sub-request didn't contain a body") + return@forEach + } + + processConfig(snode, body, configType) } } - poll(snode, deferred) + // the first response will be the personal messages (we want these to be processed after config messages) + val personalResponseIndex = requestSparseArray.indexOfKey(Namespace.DEFAULT()) + if (personalResponseIndex >= 0) { + responseList.getOrNull(personalResponseIndex)?.let { rawResponse -> + if (rawResponse["code"] as? Int != 200) { + Log.e(TAG, "Batch sub-request for personal messages had non-200 response code, returned code ${(rawResponse["code"] as? Int) ?: "[unknown]"}") + // If we got a non-success response then the snode might be bad so we should try rotate + // to a different one just in case + pollNextSnode(deferred = deferred) + return@bind Promise.ofSuccess(Unit) + } else { + val body = rawResponse["body"] as? RawResponse + if (body == null) { + Log.e(TAG, "Batch sub-request for personal messages didn't contain a body") + } else { + processPersonalMessages(snode, body) + } + } + } } - }.fail { - Log.e(TAG, "Failed to get raw batch response", it) + poll(snode, deferred) } + }.fail { + Log.e(TAG, "Failed to get raw batch response", it) + poll(snode, deferred) } } } diff --git a/libsession/src/main/java/org/session/libsession/snode/OnionRequestAPI.kt b/libsession/src/main/java/org/session/libsession/snode/OnionRequestAPI.kt index 9ff541a9d5..91eb35fbfd 100644 --- a/libsession/src/main/java/org/session/libsession/snode/OnionRequestAPI.kt +++ b/libsession/src/main/java/org/session/libsession/snode/OnionRequestAPI.kt @@ -1,5 +1,8 @@ package org.session.libsession.snode +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.launch import nl.komponents.kovenant.Deferred import nl.komponents.kovenant.Promise import nl.komponents.kovenant.all @@ -8,6 +11,7 @@ import nl.komponents.kovenant.functional.bind import nl.komponents.kovenant.functional.map import okhttp3.Request import org.session.libsession.messaging.file_server.FileServerApi +import org.session.libsession.snode.utilities.asyncPromise import org.session.libsession.utilities.AESGCM import org.session.libsession.utilities.AESGCM.EncryptionResult import org.session.libsession.utilities.getBodyForOnionRequest @@ -27,6 +31,7 @@ import org.session.libsignal.utilities.recover import org.session.libsignal.utilities.toHexString import java.util.concurrent.atomic.AtomicReference import kotlin.collections.set +import kotlin.coroutines.EmptyCoroutineContext private typealias Path = List @@ -112,26 +117,14 @@ object OnionRequestAPI { * Tests the given snode. The returned promise errors out if the snode is faulty; the promise is fulfilled otherwise. */ private fun testSnode(snode: Snode): Promise { - val deferred = deferred() - ThreadUtils.queue { // No need to block the shared context for this + return GlobalScope.asyncPromise { // No need to block the shared context for this val url = "${snode.address}:${snode.port}/get_stats/v1" - try { - val response = HTTP.execute(HTTP.Verb.GET, url, 3).decodeToString() - val json = JsonUtil.fromJson(response, Map::class.java) - val version = json["version"] as? String - if (version == null) { deferred.reject(Exception("Missing snode version.")); return@queue } - if (version >= "2.0.7") { - deferred.resolve(Unit) - } else { - val message = "Unsupported snode version: $version." - Log.d("Loki", message) - deferred.reject(Exception(message)) - } - } catch (exception: Exception) { - deferred.reject(exception) - } + val response = HTTP.execute(HTTP.Verb.GET, url, 3).decodeToString() + val json = JsonUtil.fromJson(response, Map::class.java) + val version = json["version"] as? String + require(version != null) { "Missing snode version." } + require(version >= "2.0.7") { "Unsupported snode version: $version." } } - return deferred.promise } /** @@ -359,7 +352,7 @@ object OnionRequestAPI { return@success deferred.reject(exception) } val destinationSymmetricKey = result.destinationSymmetricKey - ThreadUtils.queue { + GlobalScope.launch { try { val response = HTTP.execute(HTTP.Verb.POST, url, body) handleResponse(response, destinationSymmetricKey, destination, version, deferred) diff --git a/libsession/src/main/java/org/session/libsession/snode/OnionRequestEncryption.kt b/libsession/src/main/java/org/session/libsession/snode/OnionRequestEncryption.kt index 3e31a52e5a..888d2c1d5f 100644 --- a/libsession/src/main/java/org/session/libsession/snode/OnionRequestEncryption.kt +++ b/libsession/src/main/java/org/session/libsession/snode/OnionRequestEncryption.kt @@ -1,8 +1,10 @@ package org.session.libsession.snode +import kotlinx.coroutines.GlobalScope import nl.komponents.kovenant.Promise import nl.komponents.kovenant.deferred import org.session.libsession.snode.OnionRequestAPI.Destination +import org.session.libsession.snode.utilities.asyncPromise import org.session.libsession.utilities.AESGCM import org.session.libsession.utilities.AESGCM.EncryptionResult import org.session.libsignal.utilities.toHexString @@ -37,68 +39,56 @@ object OnionRequestEncryption { destination: Destination, version: Version ): Promise { - val deferred = deferred() - ThreadUtils.queue { - try { - val plaintext = if (version == Version.V4) { - payload - } else { - // Wrapping isn't needed for file server or open group onion requests - when (destination) { - is Destination.Snode -> encode(payload, mapOf("headers" to "")) - is Destination.Server -> payload - } + return GlobalScope.asyncPromise { + val plaintext = if (version == Version.V4) { + payload + } else { + // Wrapping isn't needed for file server or open group onion requests + when (destination) { + is Destination.Snode -> encode(payload, mapOf("headers" to "")) + is Destination.Server -> payload } - val x25519PublicKey = when (destination) { - is Destination.Snode -> destination.snode.publicKeySet!!.x25519Key - is Destination.Server -> destination.x25519PublicKey - } - val result = AESGCM.encrypt(plaintext, x25519PublicKey) - deferred.resolve(result) - } catch (exception: Exception) { - deferred.reject(exception) } + val x25519PublicKey = when (destination) { + is Destination.Snode -> destination.snode.publicKeySet!!.x25519Key + is Destination.Server -> destination.x25519PublicKey + } + AESGCM.encrypt(plaintext, x25519PublicKey) } - return deferred.promise } /** * Encrypts the previous encryption result (i.e. that of the hop after this one) for this hop. Use this to build the layers of an onion request. */ internal fun encryptHop(lhs: Destination, rhs: Destination, previousEncryptionResult: EncryptionResult): Promise { - val deferred = deferred() - ThreadUtils.queue { - try { - val payload: MutableMap = when (rhs) { - is Destination.Snode -> { - mutableMapOf( "destination" to rhs.snode.publicKeySet!!.ed25519Key ) - } - is Destination.Server -> { - mutableMapOf( - "host" to rhs.host, - "target" to rhs.target, - "method" to "POST", - "protocol" to rhs.scheme, - "port" to rhs.port - ) - } + return GlobalScope.asyncPromise { + val payload: MutableMap = when (rhs) { + is Destination.Snode -> { + mutableMapOf("destination" to rhs.snode.publicKeySet!!.ed25519Key) } - payload["ephemeral_key"] = previousEncryptionResult.ephemeralPublicKey.toHexString() - val x25519PublicKey = when (lhs) { - is Destination.Snode -> { - lhs.snode.publicKeySet!!.x25519Key - } - is Destination.Server -> { - lhs.x25519PublicKey - } + + is Destination.Server -> { + mutableMapOf( + "host" to rhs.host, + "target" to rhs.target, + "method" to "POST", + "protocol" to rhs.scheme, + "port" to rhs.port + ) } - val plaintext = encode(previousEncryptionResult.ciphertext, payload) - val result = AESGCM.encrypt(plaintext, x25519PublicKey) - deferred.resolve(result) - } catch (exception: Exception) { - deferred.reject(exception) } + payload["ephemeral_key"] = previousEncryptionResult.ephemeralPublicKey.toHexString() + val x25519PublicKey = when (lhs) { + is Destination.Snode -> { + lhs.snode.publicKeySet!!.x25519Key + } + + is Destination.Server -> { + lhs.x25519PublicKey + } + } + val plaintext = encode(previousEncryptionResult.ciphertext, payload) + AESGCM.encrypt(plaintext, x25519PublicKey) } - return deferred.promise } } diff --git a/libsession/src/main/java/org/session/libsession/snode/SnodeAPI.kt b/libsession/src/main/java/org/session/libsession/snode/SnodeAPI.kt index 38853faa6c..c484259d50 100644 --- a/libsession/src/main/java/org/session/libsession/snode/SnodeAPI.kt +++ b/libsession/src/main/java/org/session/libsession/snode/SnodeAPI.kt @@ -9,6 +9,7 @@ import com.goterl.lazysodium.interfaces.GenericHash import com.goterl.lazysodium.interfaces.PwHash import com.goterl.lazysodium.interfaces.SecretBox import com.goterl.lazysodium.utils.Key +import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.GlobalScope @@ -27,6 +28,7 @@ import nl.komponents.kovenant.unwrap import org.session.libsession.messaging.utilities.MessageWrapper import org.session.libsession.messaging.utilities.SodiumUtilities.sodium import org.session.libsession.snode.model.BatchResponse +import org.session.libsession.snode.utilities.asyncPromise import org.session.libsession.snode.utilities.await import org.session.libsession.snode.utilities.retrySuspendAsPromise import org.session.libsession.utilities.mapValuesNotNull @@ -137,7 +139,7 @@ object SnodeAPI { JsonUtil.fromJson(it.body ?: throw Error.Generic, Map::class.java) } - else -> task { + else -> GlobalScope.asyncPromise { HTTP.execute( HTTP.Verb.POST, url = "${snode.address}:${snode.port}/storage_rpc/v1", @@ -169,17 +171,15 @@ object SnodeAPI { JsonUtil.fromJson(resp.body ?: throw Error.Generic, responseClass) } - else -> withContext(Dispatchers.IO) { - HTTP.execute( - HTTP.Verb.POST, - url = "${snode.address}:${snode.port}/storage_rpc/v1", - parameters = buildMap { - this["method"] = method.rawValue - this["params"] = parameters - } - ).toString().let { - JsonUtil.fromJson(it, responseClass) + else -> HTTP.execute( + HTTP.Verb.POST, + url = "${snode.address}:${snode.port}/storage_rpc/v1", + parameters = buildMap { + this["method"] = method.rawValue + this["params"] = parameters } + ).toString().let { + JsonUtil.fromJson(it, responseClass) } } @@ -192,7 +192,7 @@ object SnodeAPI { } internal fun getRandomSnode(): Promise = - snodePool.takeIf { it.size >= minimumSnodePoolCount }?.secureRandom()?.let { Promise.of(it) } ?: task { + snodePool.takeIf { it.size >= minimumSnodePoolCount }?.secureRandom()?.let { Promise.of(it) } ?: GlobalScope.asyncPromise { val target = seedNodePool.random() Log.d("Loki", "Populating snode pool using: $target.") val url = "$target/json_rpc" @@ -241,7 +241,7 @@ object SnodeAPI { } // Public API - fun getAccountID(onsName: String): Promise = task { + fun getAccountID(onsName: String): Promise = GlobalScope.asyncPromise { val validationCount = 3 val accountIDByteCount = 33 // Hash the ONS name using BLAKE2b @@ -630,11 +630,16 @@ object SnodeAPI { getBatchResponse( snode = snode, publicKey = batch.first().publicKey, - requests = batch.map { it.request }, sequence = false + requests = batch.mapNotNull { info -> + info.request.takeIf { !info.callback.isClosedForSend } + }, + sequence = false ) } catch (e: Exception) { for (req in batch) { - req.callback.send(Result.failure(e)) + runCatching { + req.callback.send(Result.failure(e)) + } } return@batch } @@ -650,7 +655,9 @@ object SnodeAPI { JsonUtil.fromJson(resp.body, req.responseType) } - req.callback.send(result) + runCatching{ + req.callback.send(result) + } } // Close all channels in the requests just in case we don't have paired up @@ -673,7 +680,14 @@ object SnodeAPI { val callback = Channel>() @Suppress("UNCHECKED_CAST") batchedRequestsSender.send(RequestInfo(snode, publicKey, request, responseType, callback as SendChannel)) - return callback.receive().getOrThrow() + try { + return callback.receive().getOrThrow() + } catch (e: CancellationException) { + // Close the channel if the coroutine is cancelled, so the batch processing won't + // handle this one (best effort only) + callback.close() + throw e + } } suspend fun sendBatchRequest( diff --git a/libsession/src/main/java/org/session/libsession/snode/utilities/PromiseUtil.kt b/libsession/src/main/java/org/session/libsession/snode/utilities/PromiseUtil.kt index 6bbff7e931..bed3163f0d 100644 --- a/libsession/src/main/java/org/session/libsession/snode/utilities/PromiseUtil.kt +++ b/libsession/src/main/java/org/session/libsession/snode/utilities/PromiseUtil.kt @@ -2,10 +2,14 @@ package org.session.libsession.snode.utilities import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.delay import kotlinx.coroutines.launch import nl.komponents.kovenant.Promise import nl.komponents.kovenant.deferred +import org.session.libsignal.utilities.Log +import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.EmptyCoroutineContext import kotlin.coroutines.resume import kotlin.coroutines.resumeWithException import kotlin.coroutines.suspendCoroutine @@ -17,9 +21,20 @@ suspend fun Promise.await(): T { } } -fun CoroutineScope.asyncPromise(block: suspend () -> T): Promise { +fun Promise.successBackground(callback: (value: V) -> Unit): Promise { + GlobalScope.launch { + try { + callback(this@successBackground.await()) + } catch (e: Exception) { + Log.d("Loki", "Failed to execute task in background: ${e.message}.") + } + } + return this +} + +fun CoroutineScope.asyncPromise(context: CoroutineContext = EmptyCoroutineContext, block: suspend () -> T): Promise { val defer = deferred() - launch { + launch(context) { try { defer.resolve(block()) } catch (e: Exception) { diff --git a/libsession/src/main/java/org/session/libsession/utilities/DownloadUtilities.kt b/libsession/src/main/java/org/session/libsession/utilities/DownloadUtilities.kt index a7b19ed6e5..fd53ec5a16 100644 --- a/libsession/src/main/java/org/session/libsession/utilities/DownloadUtilities.kt +++ b/libsession/src/main/java/org/session/libsession/utilities/DownloadUtilities.kt @@ -1,8 +1,11 @@ package org.session.libsession.utilities +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.withContext import okhttp3.HttpUrl import okhttp3.HttpUrl.Companion.toHttpUrlOrNull import org.session.libsession.messaging.file_server.FileServerApi +import org.session.libsession.snode.utilities.await import org.session.libsignal.utilities.HTTP import org.session.libsignal.utilities.Log import java.io.File @@ -14,8 +17,7 @@ object DownloadUtilities { /** * Blocks the calling thread. */ - @JvmStatic - fun downloadFile(destination: File, url: String) { + suspend fun downloadFile(destination: File, url: String) { val outputStream = FileOutputStream(destination) // Throws var remainingAttempts = 2 var exception: Exception? = null @@ -35,13 +37,13 @@ object DownloadUtilities { /** * Blocks the calling thread. */ - @JvmStatic - fun downloadFile(outputStream: OutputStream, urlAsString: String) { + suspend fun downloadFile(outputStream: OutputStream, urlAsString: String) { val url = urlAsString.toHttpUrlOrNull()!! val fileID = url.pathSegments.last() try { - FileServerApi.download(fileID).get().let { - outputStream.write(it) + val data = FileServerApi.download(fileID).await() + withContext(Dispatchers.IO) { + outputStream.write(data) } } catch (e: Exception) { when (e) { diff --git a/libsignal/src/main/java/org/session/libsignal/utilities/HTTP.kt b/libsignal/src/main/java/org/session/libsignal/utilities/HTTP.kt index 4f8b20f7c6..2df9ee9802 100644 --- a/libsignal/src/main/java/org/session/libsignal/utilities/HTTP.kt +++ b/libsignal/src/main/java/org/session/libsignal/utilities/HTTP.kt @@ -1,10 +1,14 @@ package org.session.libsignal.utilities import android.util.Log +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.withContext +import okhttp3.Call import okhttp3.MediaType.Companion.toMediaType import okhttp3.OkHttpClient import okhttp3.Request import okhttp3.RequestBody +import okhttp3.Response import org.session.libsignal.utilities.Util.SECURE_RANDOM import java.security.cert.X509Certificate import java.util.concurrent.TimeUnit @@ -81,14 +85,14 @@ object HTTP { /** * Sync. Don't call from the main thread. */ - fun execute(verb: Verb, url: String, timeout: Long = HTTP.timeout, useSeedNodeConnection: Boolean = false): ByteArray { + suspend fun execute(verb: Verb, url: String, timeout: Long = HTTP.timeout, useSeedNodeConnection: Boolean = false): ByteArray { return execute(verb = verb, url = url, body = null, timeout = timeout, useSeedNodeConnection = useSeedNodeConnection) } /** * Sync. Don't call from the main thread. */ - fun execute(verb: Verb, url: String, parameters: Map?, timeout: Long = HTTP.timeout, useSeedNodeConnection: Boolean = false): ByteArray { + suspend fun execute(verb: Verb, url: String, parameters: Map?, timeout: Long = HTTP.timeout, useSeedNodeConnection: Boolean = false): ByteArray { return if (parameters != null) { val body = JsonUtil.toJson(parameters).toByteArray() execute(verb = verb, url = url, body = body, timeout = timeout, useSeedNodeConnection = useSeedNodeConnection) @@ -100,7 +104,7 @@ object HTTP { /** * Sync. Don't call from the main thread. */ - fun execute(verb: Verb, url: String, body: ByteArray?, timeout: Long = HTTP.timeout, useSeedNodeConnection: Boolean = false): ByteArray { + suspend fun execute(verb: Verb, url: String, body: ByteArray?, timeout: Long = HTTP.timeout, useSeedNodeConnection: Boolean = false): ByteArray { val request = Request.Builder().url(url) .removeHeader("User-Agent").addHeader("User-Agent", "WhatsApp") // Set a fake value .removeHeader("Accept-Language").addHeader("Accept-Language", "en-us") // Set a fake value @@ -125,7 +129,7 @@ object HTTP { } useSeedNodeConnection -> seedNodeConnection else -> defaultConnection - }.newCall(request.build()).execute().use { response -> + }.newCall(request.build()).await().use { response -> when (val statusCode = response.code) { 200 -> response.body!!.bytes() else -> { @@ -143,4 +147,13 @@ object HTTP { throw HTTPRequestFailedException(0, null, "HTTP request failed due to: ${exception.message}") } } + + @Suppress("OPT_IN_USAGE") + private val httpCallDispatcher = Dispatchers.IO.limitedParallelism(3) + + private suspend fun Call.await(): Response { + return withContext(httpCallDispatcher) { + execute() + } + } } diff --git a/libsignal/src/main/java/org/session/libsignal/utilities/PromiseUtilities.kt b/libsignal/src/main/java/org/session/libsignal/utilities/PromiseUtilities.kt index fdf8f107b9..d4f869aa24 100644 --- a/libsignal/src/main/java/org/session/libsignal/utilities/PromiseUtilities.kt +++ b/libsignal/src/main/java/org/session/libsignal/utilities/PromiseUtilities.kt @@ -4,19 +4,9 @@ package org.session.libsignal.utilities import nl.komponents.kovenant.Promise import nl.komponents.kovenant.deferred import nl.komponents.kovenant.functional.map -import nl.komponents.kovenant.task -import java.util.concurrent.TimeoutException -fun emptyPromise() = EMPTY_PROMISE -private val EMPTY_PROMISE: Promise<*, java.lang.Exception> = task {} +fun emptyPromise() = Promise.of(Unit) -fun Promise.get(defaultValue: V): V { - return try { - get() - } catch (e: Exception) { - defaultValue - } -} fun Promise.recover(callback: (exception: E) -> V): Promise { val deferred = deferred() @@ -33,33 +23,6 @@ fun Promise.recover(callback: (exception: E) -> V): Pro return deferred.promise } -fun Promise.successBackground(callback: (value: V) -> Unit): Promise { - ThreadUtils.queue { - try { - callback(get()) - } catch (e: Exception) { - Log.d("Loki", "Failed to execute task in background: ${e.message}.") - } - } - return this -} - -fun Promise.timeout(millis: Long): Promise { - if (this.isDone()) { return this; } - val deferred = deferred() - ThreadUtils.queue { - Thread.sleep(millis) - if (!deferred.promise.isDone()) { - deferred.reject(TimeoutException("Promise timed out.")) - } - } - this.success { - if (!deferred.promise.isDone()) { deferred.resolve(it) } - }.fail { - if (!deferred.promise.isDone()) { deferred.reject(it) } - } - return deferred.promise -} infix fun Promise.sideEffect( callback: (value: V) -> Unit diff --git a/libsignal/src/main/java/org/session/libsignal/utilities/ThreadUtils.kt b/libsignal/src/main/java/org/session/libsignal/utilities/ThreadUtils.kt index e3560e3f4f..a8b4e79b5e 100644 --- a/libsignal/src/main/java/org/session/libsignal/utilities/ThreadUtils.kt +++ b/libsignal/src/main/java/org/session/libsignal/utilities/ThreadUtils.kt @@ -14,16 +14,10 @@ object ThreadUtils { const val PRIORITY_IMPORTANT_BACKGROUND_THREAD = Process.THREAD_PRIORITY_DEFAULT + Process.THREAD_PRIORITY_LESS_FAVORABLE - // Note: To see how many threads are running in our app at any given time we can use: - // val threadCount = getAllStackTraces().size - + @Deprecated("Use a proper coroutine context/dispatcher instead, so it's clearer what priority you want the work to be done") @JvmStatic - fun queue(target: Runnable) { - queue(target::run) - } - fun queue(target: () -> Unit) { - Dispatchers.IO.dispatch(EmptyCoroutineContext) { + Dispatchers.Default.dispatch(EmptyCoroutineContext) { try { target() } catch (e: Exception) {