diff --git a/libsignal/src/main/java/org/session/libsignal/service/loki/api/LokiMessage.kt b/libsignal/src/main/java/org/session/libsignal/service/loki/api/LokiMessage.kt index 3353a29ffb..01fc55440e 100644 --- a/libsignal/src/main/java/org/session/libsignal/service/loki/api/LokiMessage.kt +++ b/libsignal/src/main/java/org/session/libsignal/service/loki/api/LokiMessage.kt @@ -6,6 +6,7 @@ import org.session.libsignal.libsignal.logging.Log import org.session.libsignal.service.internal.util.Base64 import org.session.libsignal.service.loki.api.crypto.ProofOfWork import org.session.libsignal.service.loki.protocol.meta.TTLUtilities +import org.session.libsignal.service.loki.utilities.ThreadUtils import org.session.libsignal.service.loki.utilities.prettifiedDescription internal data class LokiMessage( @@ -60,7 +61,7 @@ internal data class LokiMessage( internal fun calculatePoW(): Promise { val deferred = deferred() // Run PoW in a background thread - Thread { + ThreadUtils.queue { val now = System.currentTimeMillis() val nonce = ProofOfWork.calculate(data, recipientPublicKey, now, ttl) if (nonce != null ) { @@ -68,7 +69,7 @@ internal data class LokiMessage( } else { deferred.reject(SnodeAPI.Error.ProofOfWorkCalculationFailed) } - }.start() + } return deferred.promise } diff --git a/libsignal/src/main/java/org/session/libsignal/service/loki/api/SnodeAPI.kt b/libsignal/src/main/java/org/session/libsignal/service/loki/api/SnodeAPI.kt index 384c0c36c1..da618c92ee 100644 --- a/libsignal/src/main/java/org/session/libsignal/service/loki/api/SnodeAPI.kt +++ b/libsignal/src/main/java/org/session/libsignal/service/loki/api/SnodeAPI.kt @@ -12,22 +12,19 @@ import org.session.libsignal.service.internal.util.Base64 import org.session.libsignal.service.loki.api.onionrequests.OnionRequestAPI import org.session.libsignal.service.loki.api.utilities.HTTP import org.session.libsignal.service.loki.database.LokiAPIDatabaseProtocol -import org.session.libsignal.service.loki.utilities.Broadcaster -import org.session.libsignal.service.loki.utilities.createContext -import org.session.libsignal.service.loki.utilities.prettifiedDescription -import org.session.libsignal.service.loki.utilities.retryIfNeeded +import org.session.libsignal.service.loki.utilities.* import java.net.ConnectException import java.net.SocketTimeoutException class SnodeAPI private constructor(public var userPublicKey: String, public val database: LokiAPIDatabaseProtocol, public val broadcaster: Broadcaster) { companion object { - val messageSendingContext = Kovenant.createContext("LokiAPIMessageSendingContext") - val messagePollingContext = Kovenant.createContext("LokiAPIMessagePollingContext") + val messageSendingContext = Kovenant.createContext() + val messagePollingContext = Kovenant.createContext() /** * For operations that are shared between message sending and message polling. */ - val sharedContext = Kovenant.createContext("LokiAPISharedContext") + val sharedContext = Kovenant.createContext() // region Initialization lateinit var shared: SnodeAPI @@ -75,7 +72,7 @@ class SnodeAPI private constructor(public var userPublicKey: String, public val return OnionRequestAPI.sendOnionRequest(method, parameters, snode, publicKey) } else { val deferred = deferred, Exception>() - Thread { + ThreadUtils.queue { val payload = mapOf( "method" to method.rawValue, "params" to parameters ) try { val json = HTTP.execute(HTTP.Verb.POST, url, payload) @@ -87,13 +84,13 @@ class SnodeAPI private constructor(public var userPublicKey: String, public val val httpRequestFailedException = exception as? HTTP.HTTPRequestFailedException if (httpRequestFailedException != null) { @Suppress("NAME_SHADOWING") val exception = handleSnodeError(httpRequestFailedException.statusCode, httpRequestFailedException.json, snode, publicKey) - return@Thread deferred.reject(exception) + return@queue deferred.reject(exception) } Log.d("Loki", "Unhandled exception: $exception.") } deferred.reject(exception) } - }.start() + } return deferred.promise } } diff --git a/libsignal/src/main/java/org/session/libsignal/service/loki/api/SwarmAPI.kt b/libsignal/src/main/java/org/session/libsignal/service/loki/api/SwarmAPI.kt index 0fd7d1379b..918d6bf250 100644 --- a/libsignal/src/main/java/org/session/libsignal/service/loki/api/SwarmAPI.kt +++ b/libsignal/src/main/java/org/session/libsignal/service/loki/api/SwarmAPI.kt @@ -8,6 +8,7 @@ import nl.komponents.kovenant.task import org.session.libsignal.libsignal.logging.Log import org.session.libsignal.service.loki.api.utilities.HTTP import org.session.libsignal.service.loki.database.LokiAPIDatabaseProtocol +import org.session.libsignal.service.loki.utilities.ThreadUtils import org.session.libsignal.service.loki.utilities.getRandomElement import org.session.libsignal.service.loki.utilities.prettifiedDescription import org.session.libsignal.service.loki.utilities.retryIfNeeded @@ -67,7 +68,7 @@ class SwarmAPI private constructor(private val database: LokiAPIDatabaseProtocol ) val deferred = deferred() deferred(SnodeAPI.sharedContext) - Thread { + ThreadUtils.queue { try { val json = HTTP.execute(HTTP.Verb.POST, url, parameters, useSeedNodeConnection = true) val intermediate = json["result"] as? Map<*, *> @@ -101,7 +102,7 @@ class SwarmAPI private constructor(private val database: LokiAPIDatabaseProtocol } catch (exception: Exception) { deferred.reject(exception) } - }.start() + } return deferred.promise } else { return Promise.of(snodePool.getRandomElement()) diff --git a/libsignal/src/main/java/org/session/libsignal/service/loki/api/onionrequests/OnionRequestAPI.kt b/libsignal/src/main/java/org/session/libsignal/service/loki/api/onionrequests/OnionRequestAPI.kt index acc783be7b..b49de68902 100644 --- a/libsignal/src/main/java/org/session/libsignal/service/loki/api/onionrequests/OnionRequestAPI.kt +++ b/libsignal/src/main/java/org/session/libsignal/service/loki/api/onionrequests/OnionRequestAPI.kt @@ -82,12 +82,12 @@ public object OnionRequestAPI { */ private fun testSnode(snode: Snode): Promise { val deferred = deferred() - Thread { // No need to block the shared context for this + ThreadUtils.queue { // No need to block the shared context for this val url = "${snode.address}:${snode.port}/get_stats/v1" try { val json = HTTP.execute(HTTP.Verb.GET, url) val version = json["version"] as? String - if (version == null) { deferred.reject(Exception("Missing snode version.")); return@Thread } + if (version == null) { deferred.reject(Exception("Missing snode version.")); return@queue } if (version >= "2.0.7") { deferred.resolve(Unit) } else { @@ -98,7 +98,7 @@ public object OnionRequestAPI { } catch (exception: Exception) { deferred.reject(exception) } - }.start() + } return deferred.promise } @@ -312,10 +312,10 @@ public object OnionRequestAPI { return@success deferred.reject(exception) } val destinationSymmetricKey = result.destinationSymmetricKey - Thread { + ThreadUtils.queue { try { val json = HTTP.execute(HTTP.Verb.POST, url, body) - val base64EncodedIVAndCiphertext = json["result"] as? String ?: return@Thread deferred.reject(Exception("Invalid JSON")) + val base64EncodedIVAndCiphertext = json["result"] as? String ?: return@queue deferred.reject(Exception("Invalid JSON")) val ivAndCiphertext = Base64.decode(base64EncodedIVAndCiphertext) try { val plaintext = DecryptionUtilities.decryptUsingAESGCM(ivAndCiphertext, destinationSymmetricKey) @@ -325,7 +325,7 @@ public object OnionRequestAPI { if (statusCode == 406) { @Suppress("NAME_SHADOWING") val body = mapOf( "result" to "Your clock is out of sync with the service node network." ) val exception = HTTPRequestFailedAtDestinationException(statusCode, body) - return@Thread deferred.reject(exception) + return@queue deferred.reject(exception) } else if (json["body"] != null) { @Suppress("NAME_SHADOWING") val body: Map<*, *> if (json["body"] is Map<*, *>) { @@ -340,13 +340,13 @@ public object OnionRequestAPI { } if (statusCode != 200) { val exception = HTTPRequestFailedAtDestinationException(statusCode, body) - return@Thread deferred.reject(exception) + return@queue deferred.reject(exception) } deferred.resolve(body) } else { if (statusCode != 200) { val exception = HTTPRequestFailedAtDestinationException(statusCode, json) - return@Thread deferred.reject(exception) + return@queue deferred.reject(exception) } deferred.resolve(json) } @@ -359,7 +359,7 @@ public object OnionRequestAPI { } catch (exception: Exception) { deferred.reject(exception) } - }.start() + } }.fail { exception -> deferred.reject(exception) } diff --git a/libsignal/src/main/java/org/session/libsignal/service/loki/api/onionrequests/OnionRequestEncryption.kt b/libsignal/src/main/java/org/session/libsignal/service/loki/api/onionrequests/OnionRequestEncryption.kt index fad358998b..d601f68ec4 100644 --- a/libsignal/src/main/java/org/session/libsignal/service/loki/api/onionrequests/OnionRequestEncryption.kt +++ b/libsignal/src/main/java/org/session/libsignal/service/loki/api/onionrequests/OnionRequestEncryption.kt @@ -5,6 +5,7 @@ import nl.komponents.kovenant.deferred import org.session.libsignal.service.internal.util.JsonUtil import org.session.libsignal.service.loki.api.utilities.EncryptionResult import org.session.libsignal.service.loki.api.utilities.EncryptionUtilities +import org.session.libsignal.service.loki.utilities.ThreadUtils import org.session.libsignal.service.loki.utilities.toHexString import java.nio.Buffer import java.nio.ByteBuffer @@ -32,7 +33,7 @@ object OnionRequestEncryption { */ internal fun encryptPayloadForDestination(payload: Map<*, *>, destination: OnionRequestAPI.Destination): Promise { val deferred = deferred() - Thread { + ThreadUtils.queue { try { // Wrapping isn't needed for file server or open group onion requests when (destination) { @@ -52,7 +53,7 @@ object OnionRequestEncryption { } catch (exception: Exception) { deferred.reject(exception) } - }.start() + } return deferred.promise } @@ -61,7 +62,7 @@ object OnionRequestEncryption { */ internal fun encryptHop(lhs: OnionRequestAPI.Destination, rhs: OnionRequestAPI.Destination, previousEncryptionResult: EncryptionResult): Promise { val deferred = deferred() - Thread { + ThreadUtils.queue { try { val payload: MutableMap when (rhs) { @@ -88,7 +89,7 @@ object OnionRequestEncryption { } catch (exception: Exception) { deferred.reject(exception) } - }.start() + } return deferred.promise } } diff --git a/libsignal/src/main/java/org/session/libsignal/service/loki/api/opengroups/PublicChatAPI.kt b/libsignal/src/main/java/org/session/libsignal/service/loki/api/opengroups/PublicChatAPI.kt index 5ca9cb787d..c4fcb0c2c2 100644 --- a/libsignal/src/main/java/org/session/libsignal/service/loki/api/opengroups/PublicChatAPI.kt +++ b/libsignal/src/main/java/org/session/libsignal/service/loki/api/opengroups/PublicChatAPI.kt @@ -16,6 +16,7 @@ import org.session.libsignal.service.loki.database.LokiAPIDatabaseProtocol import org.session.libsignal.service.loki.database.LokiOpenGroupDatabaseProtocol import org.session.libsignal.service.loki.database.LokiUserDatabaseProtocol import org.session.libsignal.service.loki.utilities.DownloadUtilities +import org.session.libsignal.service.loki.utilities.ThreadUtils import org.session.libsignal.service.loki.utilities.createContext import org.session.libsignal.service.loki.utilities.retryIfNeeded import java.io.ByteArrayOutputStream @@ -27,7 +28,7 @@ class PublicChatAPI(userPublicKey: String, private val userPrivateKey: ByteArray companion object { private val moderators: HashMap>> = hashMapOf() // Server URL to (channel ID to set of moderator IDs) - val sharedContext = Kovenant.createContext("LokiPublicChatAPISharedContext") + val sharedContext = Kovenant.createContext() // region Settings private val fallbackBatchCount = 64 @@ -38,15 +39,15 @@ class PublicChatAPI(userPublicKey: String, private val userPrivateKey: ByteArray private val channelInfoType = "net.patter-app.settings" private val attachmentType = "net.app.core.oembed" @JvmStatic - public val publicChatMessageType = "network.loki.messenger.publicChat" + val publicChatMessageType = "network.loki.messenger.publicChat" @JvmStatic - public val profilePictureType = "network.loki.messenger.avatar" + val profilePictureType = "network.loki.messenger.avatar" fun getDefaultChats(): List { return listOf() // Don't auto-join any open groups right now } - public fun isUserModerator(hexEncodedPublicKey: String, channel: Long, server: String): Boolean { + fun isUserModerator(hexEncodedPublicKey: String, channel: Long, server: String): Boolean { if (moderators[server] != null && moderators[server]!![channel] != null) { return moderators[server]!![channel]!!.contains(hexEncodedPublicKey) } @@ -56,7 +57,7 @@ class PublicChatAPI(userPublicKey: String, private val userPrivateKey: ByteArray } // region Public API - public fun getMessages(channel: Long, server: String): Promise, Exception> { + fun getMessages(channel: Long, server: String): Promise, Exception> { Log.d("Loki", "Getting messages for open group with ID: $channel on server: $server.") val parameters = mutableMapOf( "include_annotations" to 1 ) val lastMessageServerID = apiDatabase.getLastMessageServerID(channel, server) @@ -160,7 +161,7 @@ class PublicChatAPI(userPublicKey: String, private val userPrivateKey: ByteArray } } - public fun getDeletedMessageServerIDs(channel: Long, server: String): Promise, Exception> { + fun getDeletedMessageServerIDs(channel: Long, server: String): Promise, Exception> { Log.d("Loki", "Getting deleted messages for open group with ID: $channel on server: $server.") val parameters = mutableMapOf() val lastDeletionServerID = apiDatabase.getLastDeletionServerID(channel, server) @@ -191,9 +192,9 @@ class PublicChatAPI(userPublicKey: String, private val userPrivateKey: ByteArray } } - public fun sendMessage(message: PublicChatMessage, channel: Long, server: String): Promise { + fun sendMessage(message: PublicChatMessage, channel: Long, server: String): Promise { val deferred = deferred() - Thread { + ThreadUtils.queue { val signedMessage = message.sign(userPrivateKey) if (signedMessage == null) { deferred.reject(SnodeAPI.Error.MessageSigningFailed) @@ -224,11 +225,11 @@ class PublicChatAPI(userPublicKey: String, private val userPrivateKey: ByteArray deferred.reject(it) } } - }.start() + } return deferred.promise } - public fun deleteMessage(messageServerID: Long, channel: Long, server: String, isSentByUser: Boolean): Promise { + fun deleteMessage(messageServerID: Long, channel: Long, server: String, isSentByUser: Boolean): Promise { return retryIfNeeded(maxRetryCount) { val isModerationRequest = !isSentByUser Log.d("Loki", "Deleting message with ID: $messageServerID from open group with ID: $channel on server: $server (isModerationRequest = $isModerationRequest).") @@ -240,7 +241,7 @@ class PublicChatAPI(userPublicKey: String, private val userPrivateKey: ByteArray } } - public fun deleteMessages(messageServerIDs: List, channel: Long, server: String, isSentByUser: Boolean): Promise, Exception> { + fun deleteMessages(messageServerIDs: List, channel: Long, server: String, isSentByUser: Boolean): Promise, Exception> { return retryIfNeeded(maxRetryCount) { val isModerationRequest = !isSentByUser val parameters = mapOf( "ids" to messageServerIDs.joinToString(",") ) @@ -253,7 +254,7 @@ class PublicChatAPI(userPublicKey: String, private val userPrivateKey: ByteArray } } - public fun getModerators(channel: Long, server: String): Promise, Exception> { + fun getModerators(channel: Long, server: String): Promise, Exception> { return execute(HTTPVerb.GET, server, "loki/v1/channel/$channel/get_moderators").then(sharedContext) { json -> try { @Suppress("UNCHECKED_CAST") val moderators = json["moderators"] as? List @@ -271,7 +272,7 @@ class PublicChatAPI(userPublicKey: String, private val userPrivateKey: ByteArray } } - public fun getChannelInfo(channel: Long, server: String): Promise { + fun getChannelInfo(channel: Long, server: String): Promise { return retryIfNeeded(maxRetryCount) { val parameters = mapOf( "include_annotations" to 1 ) execute(HTTPVerb.GET, server, "/channels/$channel", parameters = parameters).then(sharedContext) { json -> @@ -295,7 +296,7 @@ class PublicChatAPI(userPublicKey: String, private val userPrivateKey: ByteArray } } - public fun updateProfileIfNeeded(channel: Long, server: String, groupID: String, info: PublicChatInfo, isForcedUpdate: Boolean) { + fun updateProfileIfNeeded(channel: Long, server: String, groupID: String, info: PublicChatInfo, isForcedUpdate: Boolean) { apiDatabase.setUserCount(channel, server, info.memberCount) openGroupDatabase.updateTitle(groupID, info.displayName) // Download and update profile picture if needed @@ -307,7 +308,7 @@ class PublicChatAPI(userPublicKey: String, private val userPrivateKey: ByteArray } } - public fun downloadOpenGroupProfilePicture(server: String, endpoint: String): ByteArray? { + fun downloadOpenGroupProfilePicture(server: String, endpoint: String): ByteArray? { val url = "${server.removeSuffix("/")}/${endpoint.removePrefix("/")}" Log.d("Loki", "Downloading open group profile picture from \"$url\".") val outputStream = ByteArrayOutputStream() @@ -323,7 +324,7 @@ class PublicChatAPI(userPublicKey: String, private val userPrivateKey: ByteArray } } - public fun join(channel: Long, server: String): Promise { + fun join(channel: Long, server: String): Promise { return retryIfNeeded(maxRetryCount) { execute(HTTPVerb.POST, server, "/channels/$channel/subscribe").then { Log.d("Loki", "Joined channel with ID: $channel on server: $server.") @@ -331,7 +332,7 @@ class PublicChatAPI(userPublicKey: String, private val userPrivateKey: ByteArray } } - public fun leave(channel: Long, server: String): Promise { + fun leave(channel: Long, server: String): Promise { return retryIfNeeded(maxRetryCount) { execute(HTTPVerb.DELETE, server, "/channels/$channel/subscribe").then { Log.d("Loki", "Left channel with ID: $channel on server: $server.") @@ -339,7 +340,15 @@ class PublicChatAPI(userPublicKey: String, private val userPrivateKey: ByteArray } } - public fun getDisplayNames(publicKeys: Set, server: String): Promise, Exception> { + fun ban(publicKey: String, server: String): Promise { + return retryIfNeeded(maxRetryCount) { + execute(HTTPVerb.POST, server, "/loki/v1/moderation/blacklist/@$publicKey").then { + Log.d("Loki", "Banned user with ID: $publicKey from $server") + } + } + } + + fun getDisplayNames(publicKeys: Set, server: String): Promise, Exception> { return getUserProfiles(publicKeys, server, false).map(sharedContext) { json -> val mapping = mutableMapOf() for (user in json) { @@ -353,17 +362,17 @@ class PublicChatAPI(userPublicKey: String, private val userPrivateKey: ByteArray } } - public fun setDisplayName(newDisplayName: String?, server: String): Promise { + fun setDisplayName(newDisplayName: String?, server: String): Promise { Log.d("Loki", "Updating display name on server: $server.") val parameters = mapOf( "name" to (newDisplayName ?: "") ) return execute(HTTPVerb.PATCH, server, "users/me", parameters = parameters).map { Unit } } - public fun setProfilePicture(server: String, profileKey: ByteArray, url: String?): Promise { + fun setProfilePicture(server: String, profileKey: ByteArray, url: String?): Promise { return setProfilePicture(server, Base64.encodeBytes(profileKey), url) } - public fun setProfilePicture(server: String, profileKey: String, url: String?): Promise { + fun setProfilePicture(server: String, profileKey: String, url: String?): Promise { Log.d("Loki", "Updating profile picture on server: $server.") val value = when (url) { null -> null diff --git a/libsignal/src/main/java/org/session/libsignal/service/loki/utilities/PromiseUtilities.kt b/libsignal/src/main/java/org/session/libsignal/service/loki/utilities/PromiseUtilities.kt index 8e8ddf6987..d7b268c279 100644 --- a/libsignal/src/main/java/org/session/libsignal/service/loki/utilities/PromiseUtilities.kt +++ b/libsignal/src/main/java/org/session/libsignal/service/loki/utilities/PromiseUtilities.kt @@ -2,25 +2,14 @@ package org.session.libsignal.service.loki.utilities import nl.komponents.kovenant.* +import nl.komponents.kovenant.jvm.asDispatcher import org.session.libsignal.libsignal.logging.Log -import kotlin.math.max +import java.util.concurrent.Executors -// Try to use all available threads minus one for the callback -private val recommendedThreadCount: Int - get() = Runtime.getRuntime().availableProcessors() - 1 - -fun Kovenant.createContext(contextName: String, threadCount: Int = max(recommendedThreadCount, 1)): Context { +fun Kovenant.createContext(): Context { return createContext { - callbackContext.dispatcher = buildDispatcher { - name = "${contextName}CallbackDispatcher" - // Ref: http://kovenant.komponents.nl/api/core_usage/#execution-order - // Having 1 concurrent task ensures we have in-order callback handling - concurrentTasks = 1 - } - workerContext.dispatcher = buildDispatcher { - name = "${contextName}WorkerDispatcher" - concurrentTasks = threadCount - } + callbackContext.dispatcher = Executors.newSingleThreadExecutor().asDispatcher() + workerContext.dispatcher = ThreadUtils.executorPool.asDispatcher() multipleCompletion = { v1, v2 -> Log.d("Loki", "Promise resolved more than once (first with $v1, then with $v2); ignoring $v2.") } diff --git a/libsignal/src/main/java/org/session/libsignal/service/loki/utilities/ThreadUtils.kt b/libsignal/src/main/java/org/session/libsignal/service/loki/utilities/ThreadUtils.kt new file mode 100644 index 0000000000..8d102afd74 --- /dev/null +++ b/libsignal/src/main/java/org/session/libsignal/service/loki/utilities/ThreadUtils.kt @@ -0,0 +1,18 @@ +package org.session.libsignal.service.loki.utilities + +import java.util.concurrent.Executors + +object ThreadUtils { + + internal val executorPool = Executors.newCachedThreadPool() + + @JvmStatic + fun queue(target: Runnable) { + executorPool.execute(target) + } + + fun queue(target: ()->Unit) { + executorPool.execute(target) + } + +} \ No newline at end of file