From 9eacdd7b3e6cffe77fe0a583b3f13b5c1ab38e92 Mon Sep 17 00:00:00 2001 From: Ryan ZHAO Date: Wed, 27 Jan 2021 10:54:25 +1100 Subject: [PATCH] sync update to libsession & clean --- .../securesms/audio/AudioRecorder.java | 4 +- libsession/build.gradle | 3 - .../messaging/opengroups/OpenGroup.kt | 14 ++-- .../messaging/opengroups/OpenGroupAPI.kt | 56 +++++++------ .../messaging/opengroups/OpenGroupInfo.kt | 8 +- .../messaging/opengroups/OpenGroupMessage.kt | 78 +++++++++---------- .../libsession/snode/OnionRequestAPI.kt | 19 ++--- .../snode/OnionRequestEncryption.kt | 5 +- .../org/session/libsession/snode/SnodeAPI.kt | 22 +++--- .../libsession/utilities/PromiseUtilities.kt | 37 +++++++++ .../libsession/utilities/ThreadUtil.java | 19 ----- .../libsession/utilities/ThreadUtils.kt | 26 +++++++ 12 files changed, 171 insertions(+), 120 deletions(-) delete mode 100644 libsession/src/main/java/org/session/libsession/utilities/ThreadUtil.java create mode 100644 libsession/src/main/java/org/session/libsession/utilities/ThreadUtils.kt diff --git a/app/src/main/java/org/thoughtcrime/securesms/audio/AudioRecorder.java b/app/src/main/java/org/thoughtcrime/securesms/audio/AudioRecorder.java index a5d72c4f5a..04ed137b3f 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/audio/AudioRecorder.java +++ b/app/src/main/java/org/thoughtcrime/securesms/audio/AudioRecorder.java @@ -12,7 +12,7 @@ import android.util.Pair; import org.thoughtcrime.securesms.providers.BlobProvider; import org.thoughtcrime.securesms.util.MediaUtil; -import org.session.libsession.utilities.ThreadUtil; +import org.session.libsession.utilities.ThreadUtils; import org.session.libsession.utilities.Util; import org.session.libsession.utilities.concurrent.ListenableFuture; import org.session.libsession.utilities.concurrent.SettableFuture; @@ -25,7 +25,7 @@ public class AudioRecorder { private static final String TAG = AudioRecorder.class.getSimpleName(); - private static final ExecutorService executor = ThreadUtil.newDynamicSingleThreadedExecutor(); + private static final ExecutorService executor = ThreadUtils.newDynamicSingleThreadedExecutor(); private final Context context; diff --git a/libsession/build.gradle b/libsession/build.gradle index cb7d54107c..0f2b5c21d3 100644 --- a/libsession/build.gradle +++ b/libsession/build.gradle @@ -1,7 +1,6 @@ plugins { id 'com.android.library' id 'kotlin-android' - id 'kotlin-kapt' } android { @@ -48,8 +47,6 @@ dependencies { androidTestImplementation 'androidx.test.ext:junit:1.1.2' androidTestImplementation 'androidx.test.espresso:espresso-core:3.3.0' implementation 'com.github.bumptech.glide:glide:4.11.0' -// annotationProcessor 'com.github.bumptech.glide:compiler:4.11.0' -// kapt 'com.github.bumptech.glide:compiler:4.11.0' implementation 'com.amulyakhare:com.amulyakhare.textdrawable:1.0.1' implementation 'com.annimon:stream:1.1.8' implementation 'com.makeramen:roundedimageview:2.1.0' diff --git a/libsession/src/main/java/org/session/libsession/messaging/opengroups/OpenGroup.kt b/libsession/src/main/java/org/session/libsession/messaging/opengroups/OpenGroup.kt index 868bb02fe4..16b713c6b6 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/opengroups/OpenGroup.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/opengroups/OpenGroup.kt @@ -2,14 +2,14 @@ package org.session.libsession.messaging.opengroups import org.session.libsignal.service.internal.util.JsonUtil -public data class OpenGroup( - public val channel: Long, +data class OpenGroup( + val channel: Long, private val serverURL: String, - public val displayName: String, - public val isDeletable: Boolean + val displayName: String, + val isDeletable: Boolean ) { - public val server get() = serverURL.toLowerCase() - public val id get() = getId(channel, server) + val server get() = serverURL.toLowerCase() + val id get() = getId(channel, server) companion object { @@ -31,7 +31,7 @@ public data class OpenGroup( } } - public fun toJSON(): Map { + fun toJSON(): Map { return mapOf( "channel" to channel, "server" to server, "displayName" to displayName, "isDeletable" to isDeletable ) } } diff --git a/libsession/src/main/java/org/session/libsession/messaging/opengroups/OpenGroupAPI.kt b/libsession/src/main/java/org/session/libsession/messaging/opengroups/OpenGroupAPI.kt index c500f65080..a271ca2de3 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/opengroups/OpenGroupAPI.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/opengroups/OpenGroupAPI.kt @@ -9,14 +9,16 @@ import org.session.libsession.messaging.MessagingConfiguration import org.session.libsession.messaging.utilities.DotNetAPI import org.session.libsession.messaging.fileserver.FileServerAPI +import org.session.libsession.utilities.ThreadUtils +import org.session.libsession.utilities.createContext import org.session.libsignal.libsignal.logging.Log import org.session.libsignal.service.internal.util.Base64 import org.session.libsignal.service.internal.util.Hex import org.session.libsignal.service.internal.util.JsonUtil +import org.session.libsignal.service.loki.api.LokiDotNetAPI +import org.session.libsignal.service.loki.api.opengroups.PublicChatAPI import org.session.libsignal.service.loki.utilities.DownloadUtilities -import org.session.libsignal.service.loki.utilities.createContext -import org.session.libsignal.service.loki.utilities.hexEncodedPublicKey import org.session.libsignal.service.loki.utilities.retryIfNeeded import java.io.ByteArrayOutputStream import java.text.SimpleDateFormat @@ -25,7 +27,7 @@ import java.util.* object OpenGroupAPI: DotNetAPI() { private val moderators: HashMap>> = hashMapOf() // Server URL to (channel ID to set of moderator IDs) - val sharedContext = Kovenant.createContext("LokiPublicChatAPISharedContext") + val sharedContext = Kovenant.createContext() // region Settings private val fallbackBatchCount = 64 @@ -36,15 +38,15 @@ object OpenGroupAPI: DotNetAPI() { private val channelInfoType = "net.patter-app.settings" private val attachmentType = "net.app.core.oembed" @JvmStatic - public val openGroupMessageType = "network.loki.messenger.publicChat" + val openGroupMessageType = "network.loki.messenger.publicChat" @JvmStatic - public val profilePictureType = "network.loki.messenger.avatar" + val profilePictureType = "network.loki.messenger.avatar" fun getDefaultChats(): List { return listOf() // Don't auto-join any open groups right now } - public fun isUserModerator(hexEncodedPublicKey: String, channel: Long, server: String): Boolean { + fun isUserModerator(hexEncodedPublicKey: String, channel: Long, server: String): Boolean { if (moderators[server] != null && moderators[server]!![channel] != null) { return moderators[server]!![channel]!!.contains(hexEncodedPublicKey) } @@ -53,7 +55,7 @@ object OpenGroupAPI: DotNetAPI() { // endregion // region Public API - public fun getMessages(channel: Long, server: String): Promise, Exception> { + fun getMessages(channel: Long, server: String): Promise, Exception> { Log.d("Loki", "Getting messages for open group with ID: $channel on server: $server.") val storage = MessagingConfiguration.shared.storage val parameters = mutableMapOf( "include_annotations" to 1 ) @@ -158,7 +160,7 @@ object OpenGroupAPI: DotNetAPI() { } } - public fun getDeletedMessageServerIDs(channel: Long, server: String): Promise, Exception> { + fun getDeletedMessageServerIDs(channel: Long, server: String): Promise, Exception> { Log.d("Loki", "Getting deleted messages for open group with ID: $channel on server: $server.") val storage = MessagingConfiguration.shared.storage val parameters = mutableMapOf() @@ -190,12 +192,12 @@ object OpenGroupAPI: DotNetAPI() { } } - public fun sendMessage(message: OpenGroupMessage, channel: Long, server: String): Promise { + fun sendMessage(message: OpenGroupMessage, channel: Long, server: String): Promise { val deferred = deferred() val storage = MessagingConfiguration.shared.storage val userKeyPair = storage.getUserKeyPair() ?: throw Error.Generic val userDisplayName = storage.getUserDisplayName() ?: throw Error.Generic - Thread { + ThreadUtils.queue { val signedMessage = message.sign(userKeyPair.second) if (signedMessage == null) { deferred.reject(Error.SigningFailed) @@ -225,11 +227,11 @@ object OpenGroupAPI: DotNetAPI() { deferred.reject(it) } } - }.start() + } return deferred.promise } - public fun deleteMessage(messageServerID: Long, channel: Long, server: String, isSentByUser: Boolean): Promise { + fun deleteMessage(messageServerID: Long, channel: Long, server: String, isSentByUser: Boolean): Promise { return retryIfNeeded(maxRetryCount) { val isModerationRequest = !isSentByUser Log.d("Loki", "Deleting message with ID: $messageServerID from open group with ID: $channel on server: $server (isModerationRequest = $isModerationRequest).") @@ -241,7 +243,7 @@ object OpenGroupAPI: DotNetAPI() { } } - public fun deleteMessages(messageServerIDs: List, channel: Long, server: String, isSentByUser: Boolean): Promise, Exception> { + fun deleteMessages(messageServerIDs: List, channel: Long, server: String, isSentByUser: Boolean): Promise, Exception> { return retryIfNeeded(maxRetryCount) { val isModerationRequest = !isSentByUser val parameters = mapOf( "ids" to messageServerIDs.joinToString(",") ) @@ -254,7 +256,7 @@ object OpenGroupAPI: DotNetAPI() { } } - public fun getModerators(channel: Long, server: String): Promise, Exception> { + fun getModerators(channel: Long, server: String): Promise, Exception> { return execute(HTTPVerb.GET, server, "loki/v1/channel/$channel/get_moderators").then(sharedContext) { json -> try { @Suppress("UNCHECKED_CAST") val moderators = json["moderators"] as? List @@ -272,7 +274,7 @@ object OpenGroupAPI: DotNetAPI() { } } - public fun getChannelInfo(channel: Long, server: String): Promise { + fun getChannelInfo(channel: Long, server: String): Promise { return retryIfNeeded(maxRetryCount) { val parameters = mapOf( "include_annotations" to 1 ) execute(HTTPVerb.GET, server, "/channels/$channel", parameters = parameters).then(sharedContext) { json -> @@ -296,7 +298,7 @@ object OpenGroupAPI: DotNetAPI() { } } - public fun updateProfileIfNeeded(channel: Long, server: String, groupID: String, info: OpenGroupInfo, isForcedUpdate: Boolean) { + fun updateProfileIfNeeded(channel: Long, server: String, groupID: String, info: OpenGroupInfo, isForcedUpdate: Boolean) { val storage = MessagingConfiguration.shared.storage storage.setUserCount(channel, server, info.memberCount) storage.updateTitle(groupID, info.displayName) @@ -309,7 +311,7 @@ object OpenGroupAPI: DotNetAPI() { } } - public fun downloadOpenGroupProfilePicture(server: String, endpoint: String): ByteArray? { + fun downloadOpenGroupProfilePicture(server: String, endpoint: String): ByteArray? { val url = "${server.removeSuffix("/")}/${endpoint.removePrefix("/")}" Log.d("Loki", "Downloading open group profile picture from \"$url\".") val outputStream = ByteArrayOutputStream() @@ -325,7 +327,7 @@ object OpenGroupAPI: DotNetAPI() { } } - public fun join(channel: Long, server: String): Promise { + fun join(channel: Long, server: String): Promise { return retryIfNeeded(maxRetryCount) { execute(HTTPVerb.POST, server, "/channels/$channel/subscribe").then { Log.d("Loki", "Joined channel with ID: $channel on server: $server.") @@ -333,7 +335,7 @@ object OpenGroupAPI: DotNetAPI() { } } - public fun leave(channel: Long, server: String): Promise { + fun leave(channel: Long, server: String): Promise { return retryIfNeeded(maxRetryCount) { execute(HTTPVerb.DELETE, server, "/channels/$channel/subscribe").then { Log.d("Loki", "Left channel with ID: $channel on server: $server.") @@ -341,7 +343,15 @@ object OpenGroupAPI: DotNetAPI() { } } - public fun getDisplayNames(publicKeys: Set, server: String): Promise, Exception> { + fun ban(publicKey: String, server: String): Promise { + return retryIfNeeded(maxRetryCount) { + execute(HTTPVerb.POST, server, "/loki/v1/moderation/blacklist/@$publicKey").then { + Log.d("Loki", "Banned user with ID: $publicKey from $server") + } + } + } + + fun getDisplayNames(publicKeys: Set, server: String): Promise, Exception> { return getUserProfiles(publicKeys, server, false).map(sharedContext) { json -> val mapping = mutableMapOf() for (user in json) { @@ -355,17 +365,17 @@ object OpenGroupAPI: DotNetAPI() { } } - public fun setDisplayName(newDisplayName: String?, server: String): Promise { + fun setDisplayName(newDisplayName: String?, server: String): Promise { Log.d("Loki", "Updating display name on server: $server.") val parameters = mapOf( "name" to (newDisplayName ?: "") ) return execute(HTTPVerb.PATCH, server, "users/me", parameters = parameters).map { Unit } } - public fun setProfilePicture(server: String, profileKey: ByteArray, url: String?): Promise { + fun setProfilePicture(server: String, profileKey: ByteArray, url: String?): Promise { return setProfilePicture(server, Base64.encodeBytes(profileKey), url) } - public fun setProfilePicture(server: String, profileKey: String, url: String?): Promise { + fun setProfilePicture(server: String, profileKey: String, url: String?): Promise { Log.d("Loki", "Updating profile picture on server: $server.") val value = when (url) { null -> null diff --git a/libsession/src/main/java/org/session/libsession/messaging/opengroups/OpenGroupInfo.kt b/libsession/src/main/java/org/session/libsession/messaging/opengroups/OpenGroupInfo.kt index 9cd1f18dea..b02431bf26 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/opengroups/OpenGroupInfo.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/opengroups/OpenGroupInfo.kt @@ -1,7 +1,7 @@ package org.session.libsession.messaging.opengroups -public data class OpenGroupInfo ( - public val displayName: String, - public val profilePictureURL: String, - public val memberCount: Int +data class OpenGroupInfo ( + val displayName: String, + val profilePictureURL: String, + val memberCount: Int ) diff --git a/libsession/src/main/java/org/session/libsession/messaging/opengroups/OpenGroupMessage.kt b/libsession/src/main/java/org/session/libsession/messaging/opengroups/OpenGroupMessage.kt index f378d6212d..64e437ecd5 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/opengroups/OpenGroupMessage.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/opengroups/OpenGroupMessage.kt @@ -7,18 +7,18 @@ import org.session.libsignal.service.internal.util.Hex import org.session.libsignal.service.loki.utilities.removing05PrefixIfNeeded import org.whispersystems.curve25519.Curve25519 -public data class OpenGroupMessage( - public val serverID: Long?, - public val senderPublicKey: String, - public val displayName: String, - public val body: String, - public val timestamp: Long, - public val type: String, - public val quote: Quote?, - public val attachments: MutableList, - public val profilePicture: ProfilePicture?, - public val signature: Signature?, - public val serverTimestamp: Long, +data class OpenGroupMessage( + val serverID: Long?, + val senderPublicKey: String, + val displayName: String, + val body: String, + val timestamp: Long, + val type: String, + val quote: Quote?, + val attachments: MutableList, + val profilePicture: ProfilePicture?, + val signature: Signature?, + val serverTimestamp: Long, ) { // region Settings @@ -98,52 +98,52 @@ public data class OpenGroupMessage( // endregion // region Types - public data class ProfilePicture( - public val profileKey: ByteArray, - public val url: String, + data class ProfilePicture( + val profileKey: ByteArray, + val url: String, ) - public data class Quote( - public val quotedMessageTimestamp: Long, - public val quoteePublicKey: String, - public val quotedMessageBody: String, - public val quotedMessageServerID: Long? = null, + data class Quote( + val quotedMessageTimestamp: Long, + val quoteePublicKey: String, + val quotedMessageBody: String, + val quotedMessageServerID: Long? = null, ) - public data class Signature( - public val data: ByteArray, - public val version: Long, + data class Signature( + val data: ByteArray, + val version: Long, ) - public data class Attachment( - public val kind: Kind, - public val server: String, - public val serverID: Long, - public val contentType: String, - public val size: Int, - public val fileName: String, - public val flags: Int, - public val width: Int, - public val height: Int, - public val caption: String?, - public val url: String, + data class Attachment( + val kind: Kind, + val server: String, + val serverID: Long, + val contentType: String, + val size: Int, + val fileName: String, + val flags: Int, + val width: Int, + val height: Int, + val caption: String?, + val url: String, /** Guaranteed to be non-`nil` if `kind` is `LinkPreview`. */ - public val linkPreviewURL: String?, + val linkPreviewURL: String?, /** Guaranteed to be non-`nil` if `kind` is `LinkPreview`. */ - public val linkPreviewTitle: String?, + val linkPreviewTitle: String?, ) { - public val dotNetAPIType = when { + val dotNetAPIType = when { contentType.startsWith("image") -> "photo" contentType.startsWith("video") -> "video" contentType.startsWith("audio") -> "audio" else -> "other" } - public enum class Kind(val rawValue: String) { + enum class Kind(val rawValue: String) { Attachment("attachment"), LinkPreview("preview") } } diff --git a/libsession/src/main/java/org/session/libsession/snode/OnionRequestAPI.kt b/libsession/src/main/java/org/session/libsession/snode/OnionRequestAPI.kt index b2673f0311..48a9b71bcb 100644 --- a/libsession/src/main/java/org/session/libsession/snode/OnionRequestAPI.kt +++ b/libsession/src/main/java/org/session/libsession/snode/OnionRequestAPI.kt @@ -14,6 +14,7 @@ import org.session.libsignal.service.loki.api.* import org.session.libsignal.service.loki.api.fileserver.FileServerAPI import org.session.libsignal.service.loki.api.utilities.* import org.session.libsession.utilities.AESGCM.EncryptionResult +import org.session.libsession.utilities.ThreadUtils import org.session.libsession.utilities.getBodyForOnionRequest import org.session.libsession.utilities.getHeadersForOnionRequest import org.session.libsignal.service.loki.utilities.* @@ -83,12 +84,12 @@ object OnionRequestAPI { */ private fun testSnode(snode: Snode): Promise { val deferred = deferred() - Thread { // No need to block the shared context for this + ThreadUtils.queue { // No need to block the shared context for this val url = "${snode.address}:${snode.port}/get_stats/v1" try { val json = HTTP.execute(HTTP.Verb.GET, url) val version = json["version"] as? String - if (version == null) { deferred.reject(Exception("Missing snode version.")); return@Thread } + if (version == null) { deferred.reject(Exception("Missing snode version.")); return@queue } if (version >= "2.0.7") { deferred.resolve(Unit) } else { @@ -99,7 +100,7 @@ object OnionRequestAPI { } catch (exception: Exception) { deferred.reject(exception) } - }.start() + } return deferred.promise } @@ -313,10 +314,10 @@ object OnionRequestAPI { return@success deferred.reject(exception) } val destinationSymmetricKey = result.destinationSymmetricKey - Thread { + ThreadUtils.queue { try { val json = HTTP.execute(HTTP.Verb.POST, url, body) - val base64EncodedIVAndCiphertext = json["result"] as? String ?: return@Thread deferred.reject(Exception("Invalid JSON")) + val base64EncodedIVAndCiphertext = json["result"] as? String ?: return@queue deferred.reject(Exception("Invalid JSON")) val ivAndCiphertext = Base64.decode(base64EncodedIVAndCiphertext) try { val plaintext = AESGCM.decrypt(ivAndCiphertext, destinationSymmetricKey) @@ -326,7 +327,7 @@ object OnionRequestAPI { if (statusCode == 406) { @Suppress("NAME_SHADOWING") val body = mapOf( "result" to "Your clock is out of sync with the service node network." ) val exception = HTTPRequestFailedAtDestinationException(statusCode, body) - return@Thread deferred.reject(exception) + return@queue deferred.reject(exception) } else if (json["body"] != null) { @Suppress("NAME_SHADOWING") val body: Map<*, *> if (json["body"] is Map<*, *>) { @@ -341,13 +342,13 @@ object OnionRequestAPI { } if (statusCode != 200) { val exception = HTTPRequestFailedAtDestinationException(statusCode, body) - return@Thread deferred.reject(exception) + return@queue deferred.reject(exception) } deferred.resolve(body) } else { if (statusCode != 200) { val exception = HTTPRequestFailedAtDestinationException(statusCode, json) - return@Thread deferred.reject(exception) + return@queue deferred.reject(exception) } deferred.resolve(json) } @@ -360,7 +361,7 @@ object OnionRequestAPI { } catch (exception: Exception) { deferred.reject(exception) } - }.start() + } }.fail { exception -> deferred.reject(exception) } diff --git a/libsession/src/main/java/org/session/libsession/snode/OnionRequestEncryption.kt b/libsession/src/main/java/org/session/libsession/snode/OnionRequestEncryption.kt index ed907e5b08..b9fe4f57e0 100644 --- a/libsession/src/main/java/org/session/libsession/snode/OnionRequestEncryption.kt +++ b/libsession/src/main/java/org/session/libsession/snode/OnionRequestEncryption.kt @@ -5,6 +5,7 @@ import nl.komponents.kovenant.deferred import org.session.libsignal.service.internal.util.JsonUtil import org.session.libsession.utilities.AESGCM.EncryptionResult import org.session.libsession.utilities.AESGCM +import org.session.libsession.utilities.ThreadUtils import org.session.libsignal.service.loki.utilities.toHexString import java.nio.Buffer import java.nio.ByteBuffer @@ -32,7 +33,7 @@ object OnionRequestEncryption { */ internal fun encryptPayloadForDestination(payload: Map<*, *>, destination: OnionRequestAPI.Destination): Promise { val deferred = deferred() - Thread { + ThreadUtils.queue { try { // Wrapping isn't needed for file server or open group onion requests when (destination) { @@ -52,7 +53,7 @@ object OnionRequestEncryption { } catch (exception: Exception) { deferred.reject(exception) } - }.start() + } return deferred.promise } diff --git a/libsession/src/main/java/org/session/libsession/snode/SnodeAPI.kt b/libsession/src/main/java/org/session/libsession/snode/SnodeAPI.kt index 3e3bac9f1c..85c40e51d2 100644 --- a/libsession/src/main/java/org/session/libsession/snode/SnodeAPI.kt +++ b/libsession/src/main/java/org/session/libsession/snode/SnodeAPI.kt @@ -7,24 +7,22 @@ import nl.komponents.kovenant.functional.bind import nl.komponents.kovenant.functional.map import org.session.libsession.snode.utilities.getRandomElement +import org.session.libsession.utilities.ThreadUtils +import org.session.libsession.utilities.createContext import org.session.libsignal.libsignal.logging.Log -import org.session.libsignal.service.internal.util.Base64 -import org.session.libsignal.service.loki.api.MessageWrapper import org.session.libsignal.service.loki.api.utilities.HTTP -import org.session.libsignal.service.loki.utilities.createContext import org.session.libsignal.service.loki.utilities.prettifiedDescription import org.session.libsignal.service.loki.utilities.retryIfNeeded -import org.session.libsignal.service.internal.push.SignalServiceProtos.Envelope import java.security.SecureRandom object SnodeAPI { val database = SnodeConfiguration.shared.storage val broadcaster = SnodeConfiguration.shared.broadcaster - val sharedContext = Kovenant.createContext("LokiAPISharedContext") - val messageSendingContext = Kovenant.createContext("LokiAPIMessageSendingContext") - val messagePollingContext = Kovenant.createContext("LokiAPIMessagePollingContext") + val sharedContext = Kovenant.createContext() + val messageSendingContext = Kovenant.createContext() + val messagePollingContext = Kovenant.createContext() internal var snodeFailureCount: MutableMap = mutableMapOf() internal var snodePool: Set @@ -57,7 +55,7 @@ object SnodeAPI { return OnionRequestAPI.sendOnionRequest(method, parameters, snode, publicKey) } else { val deferred = deferred, Exception>() - Thread { + ThreadUtils.queue { val payload = mapOf( "method" to method.rawValue, "params" to parameters ) try { val json = HTTP.execute(HTTP.Verb.POST, url, payload) @@ -66,12 +64,12 @@ object SnodeAPI { val httpRequestFailedException = exception as? HTTP.HTTPRequestFailedException if (httpRequestFailedException != null) { val error = handleSnodeError(httpRequestFailedException.statusCode, httpRequestFailedException.json, snode, publicKey) - if (error != null) { return@Thread deferred.reject(exception) } + if (error != null) { return@queue deferred.reject(exception) } } Log.d("Loki", "Unhandled exception: $exception.") deferred.reject(exception) } - }.start() + } return deferred.promise } } @@ -91,7 +89,7 @@ object SnodeAPI { ) val deferred = deferred() deferred(SnodeAPI.sharedContext) - Thread { + ThreadUtils.queue { try { val json = HTTP.execute(HTTP.Verb.POST, url, parameters, useSeedNodeConnection = true) val intermediate = json["result"] as? Map<*, *> @@ -125,7 +123,7 @@ object SnodeAPI { } catch (exception: Exception) { deferred.reject(exception) } - }.start() + } return deferred.promise } else { return Promise.of(snodePool.getRandomElement()) diff --git a/libsession/src/main/java/org/session/libsession/utilities/PromiseUtilities.kt b/libsession/src/main/java/org/session/libsession/utilities/PromiseUtilities.kt index e9455ef3e3..225611d8b3 100644 --- a/libsession/src/main/java/org/session/libsession/utilities/PromiseUtilities.kt +++ b/libsession/src/main/java/org/session/libsession/utilities/PromiseUtilities.kt @@ -1,11 +1,48 @@ @file:JvmName("PromiseUtilities") package org.session.libsession.utilities +import nl.komponents.kovenant.Context +import nl.komponents.kovenant.Kovenant import nl.komponents.kovenant.Promise import nl.komponents.kovenant.deferred +import nl.komponents.kovenant.jvm.asDispatcher import org.session.libsignal.libsignal.logging.Log +import java.util.concurrent.Executors import java.util.concurrent.TimeoutException +fun Kovenant.createContext(): Context { + return createContext { + callbackContext.dispatcher = Executors.newSingleThreadExecutor().asDispatcher() + workerContext.dispatcher = ThreadUtils.executorPool.asDispatcher() + multipleCompletion = { v1, v2 -> + Log.d("Loki", "Promise resolved more than once (first with $v1, then with $v2); ignoring $v2.") + } + } +} + +fun Promise.get(defaultValue: V): V { + return try { + get() + } catch (e: Exception) { + defaultValue + } +} + +fun Promise.recover(callback: (exception: E) -> V): Promise { + val deferred = deferred() + success { + deferred.resolve(it) + }.fail { + try { + val value = callback(it) + deferred.resolve(value) + } catch (e: Throwable) { + deferred.reject(it) + } + } + return deferred.promise +} + fun Promise.successBackground(callback: (value: V) -> Unit): Promise { Thread { try { diff --git a/libsession/src/main/java/org/session/libsession/utilities/ThreadUtil.java b/libsession/src/main/java/org/session/libsession/utilities/ThreadUtil.java deleted file mode 100644 index cdc5c6286a..0000000000 --- a/libsession/src/main/java/org/session/libsession/utilities/ThreadUtil.java +++ /dev/null @@ -1,19 +0,0 @@ -package org.session.libsession.utilities; - -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -public class ThreadUtil { - - public static ExecutorService newDynamicSingleThreadedExecutor() { - ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 60, TimeUnit.SECONDS, - new LinkedBlockingQueue()); - executor.allowCoreThreadTimeOut(true); - - return executor; - } - -} diff --git a/libsession/src/main/java/org/session/libsession/utilities/ThreadUtils.kt b/libsession/src/main/java/org/session/libsession/utilities/ThreadUtils.kt new file mode 100644 index 0000000000..e952d76008 --- /dev/null +++ b/libsession/src/main/java/org/session/libsession/utilities/ThreadUtils.kt @@ -0,0 +1,26 @@ +package org.session.libsession.utilities + +import java.util.concurrent.* + +object ThreadUtils { + + internal val executorPool = Executors.newCachedThreadPool() + + @JvmStatic + fun queue(target: Runnable) { + executorPool.execute(target) + } + + fun queue(target: () -> Unit) { + executorPool.execute(target) + } + + @JvmStatic + fun newDynamicSingleThreadedExecutor(): ExecutorService { + val executor = ThreadPoolExecutor(1, 1, 60, TimeUnit.SECONDS, + LinkedBlockingQueue()) + executor.allowCoreThreadTimeOut(true) + return executor + } + +} \ No newline at end of file