Getting rid of .get call on promise

This commit is contained in:
SessionHero01
2024-10-02 11:25:37 +10:00
parent 45a66d0eea
commit 3faae5ddbe
27 changed files with 363 additions and 344 deletions

View File

@@ -6,10 +6,14 @@ import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.debounce
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import network.loki.messenger.libsession_util.util.ConfigPush
import org.session.libsession.database.StorageProtocol
import org.session.libsession.database.userAuth
@@ -29,6 +33,7 @@ import org.session.libsignal.utilities.Base64
import org.session.libsignal.utilities.Log
import org.session.libsignal.utilities.Namespace
import org.session.libsignal.utilities.Snode
import java.util.concurrent.Executors
import javax.inject.Inject
private const val TAG = "ConfigSyncHandler"
@@ -44,32 +49,35 @@ class ConfigSyncHandler @Inject constructor(
) {
private var job: Job? = null
@OptIn(ExperimentalCoroutinesApi::class, DelicateCoroutinesApi::class)
@OptIn(DelicateCoroutinesApi::class)
fun start() {
require(job == null) { "Already started" }
job = GlobalScope.launch {
val groupDispatchers = hashMapOf<AccountId, CoroutineDispatcher>()
val userConfigDispatcher = Dispatchers.Default.limitedParallelism(1)
val groupMutex = hashMapOf<AccountId, Mutex>()
val userMutex = Mutex()
configFactory.configUpdateNotifications.collect { changes ->
configFactory.configUpdateNotifications
.collect { changes ->
try {
when (changes) {
is ConfigUpdateNotification.GroupConfigsDeleted -> {
groupDispatchers.remove(changes.groupId)
groupMutex.remove(changes.groupId)
}
is ConfigUpdateNotification.GroupConfigsUpdated -> {
// Group config pushing is limited to its own dispatcher
launch(groupDispatchers.getOrPut(changes.groupId) {
Dispatchers.Default.limitedParallelism(1)
}) {
pushGroupConfigsChangesIfNeeded(changes.groupId)
launch {
groupMutex.getOrPut(changes.groupId) { Mutex() }.withLock {
pushGroupConfigsChangesIfNeeded(changes.groupId)
}
}
}
ConfigUpdateNotification.UserConfigs -> launch(userConfigDispatcher) {
pushUserConfigChangesIfNeeded()
ConfigUpdateNotification.UserConfigs -> launch {
userMutex.withLock {
pushUserConfigChangesIfNeeded()
}
}
}
} catch (e: Exception) {

View File

@@ -12,6 +12,7 @@ import org.session.libsession.messaging.messages.Message
import org.session.libsession.messaging.open_groups.OpenGroupApi
import org.session.libsession.messaging.sending_receiving.MessageSender
import org.session.libsession.messaging.utilities.Data
import org.session.libsession.snode.utilities.await
import org.session.libsession.utilities.DecodedAudio
import org.session.libsession.utilities.InputStreamMediaDataSource
import org.session.libsession.utilities.UploadResult
@@ -76,7 +77,7 @@ class AttachmentUploadJob(val attachmentID: Long, val threadID: String, val mess
}
}
private fun upload(attachment: SignalServiceAttachmentStream, server: String, encrypt: Boolean, upload: (ByteArray) -> Promise<Long, Exception>): Pair<ByteArray, UploadResult> {
private suspend fun upload(attachment: SignalServiceAttachmentStream, server: String, encrypt: Boolean, upload: (ByteArray) -> Promise<Long, Exception>): Pair<ByteArray, UploadResult> {
// Key
val key = if (encrypt) Util.getSecretBytes(64) else ByteArray(0)
// Length
@@ -102,7 +103,7 @@ class AttachmentUploadJob(val attachmentID: Long, val threadID: String, val mess
drb.writeTo(b)
val data = b.readByteArray()
// Upload the data
val id = upload(data).get()
val id = upload(data).await()
val digest = drb.transmittedDigest
// Return
return Pair(key, UploadResult(id, "${server}/file/$id", digest))

View File

@@ -24,7 +24,7 @@ class JobQueue : JobDelegate {
private var hasResumedPendingJobs = false // Just for debugging
private val jobTimestampMap = ConcurrentHashMap<Long, AtomicInteger>()
private val scope = GlobalScope
private val scope: CoroutineScope = GlobalScope
private val queue = Channel<Job>(UNLIMITED)
private val pendingJobIds = mutableSetOf<String>()
@@ -34,9 +34,8 @@ class JobQueue : JobDelegate {
private fun CoroutineScope.processWithOpenGroupDispatcher(
channel: Channel<Job>,
dispatcher: CoroutineDispatcher,
name: String
) = launch(dispatcher) {
) = launch {
for (job in channel) {
if (!isActive) break
val openGroupId = when (job) {
@@ -54,7 +53,7 @@ class JobQueue : JobDelegate {
val groupChannel = if (!openGroupChannels.containsKey(openGroupId)) {
Log.d("OpenGroupDispatcher", "Creating ${openGroupId.hashCode()} channel")
val newGroupChannel = Channel<Job>(UNLIMITED)
launch(dispatcher) {
launch {
for (groupJob in newGroupChannel) {
if (!isActive) break
groupJob.process(name)
@@ -74,14 +73,13 @@ class JobQueue : JobDelegate {
private fun CoroutineScope.processWithDispatcher(
channel: Channel<Job>,
dispatcher: CoroutineDispatcher,
name: String,
asynchronous: Boolean = true
) = launch(dispatcher) {
) = launch {
for (job in channel) {
if (!isActive) break
if (asynchronous) {
launch(dispatcher) {
launch {
job.process(name)
}
} else {
@@ -111,10 +109,10 @@ class JobQueue : JobDelegate {
val mediaQueue = Channel<Job>(capacity = UNLIMITED)
val openGroupQueue = Channel<Job>(capacity = UNLIMITED)
val receiveJob = processWithDispatcher(rxQueue, Dispatchers.Default.limitedParallelism(1), "rx", asynchronous = false)
val txJob = processWithDispatcher(txQueue, Dispatchers.Default.limitedParallelism(1), "tx")
val mediaJob = processWithDispatcher(mediaQueue, Dispatchers.Default.limitedParallelism(4), "media")
val openGroupJob = processWithOpenGroupDispatcher(openGroupQueue, Dispatchers.Default.limitedParallelism(8), "openGroup")
val receiveJob = processWithDispatcher(rxQueue, "rx", asynchronous = false)
val txJob = processWithDispatcher(txQueue, "tx")
val mediaJob = processWithDispatcher(mediaQueue, "media")
val openGroupJob = processWithOpenGroupDispatcher(openGroupQueue, "openGroup")
while (isActive) {
when (val job = queue.receive()) {

View File

@@ -7,6 +7,7 @@ import org.session.libsession.messaging.messages.Message
import org.session.libsession.messaging.sending_receiving.MessageReceiver
import org.session.libsession.messaging.sending_receiving.handle
import org.session.libsession.messaging.utilities.Data
import org.session.libsession.snode.utilities.await
import org.session.libsignal.utilities.Log
class MessageReceiveJob(val data: ByteArray, val serverHash: String? = null, val openGroupMessageServerID: Long? = null, val openGroupID: String? = null) : Job {
@@ -27,7 +28,7 @@ class MessageReceiveJob(val data: ByteArray, val serverHash: String? = null, val
}
override suspend fun execute(dispatcherName: String) {
executeAsync(dispatcherName).get()
executeAsync(dispatcherName).await()
}
fun executeAsync(dispatcherName: String): Promise<Unit, Exception> {

View File

@@ -8,7 +8,9 @@ import com.fasterxml.jackson.databind.annotation.JsonNaming
import com.fasterxml.jackson.databind.type.TypeFactory
import com.goterl.lazysodium.interfaces.GenericHash
import com.goterl.lazysodium.interfaces.Sign
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.launch
import nl.komponents.kovenant.Promise
import nl.komponents.kovenant.functional.map
import okhttp3.Headers.Companion.toHeaders
@@ -22,6 +24,8 @@ import org.session.libsession.messaging.utilities.SodiumUtilities.sodium
import org.session.libsession.snode.OnionRequestAPI
import org.session.libsession.snode.OnionResponse
import org.session.libsession.snode.SnodeAPI
import org.session.libsession.snode.utilities.asyncPromise
import org.session.libsession.snode.utilities.await
import org.session.libsession.utilities.TextSecurePreferences
import org.session.libsignal.utilities.AccountId
import org.session.libsignal.utilities.Base64.decode
@@ -858,7 +862,9 @@ object OpenGroupApi {
}
fun getDefaultRoomsIfNeeded(): Promise<List<DefaultGroup>, Exception> {
return getAllRooms().map { groups ->
return GlobalScope.asyncPromise {
val groups = getAllRooms().await()
val earlyGroups = groups.map { group ->
DefaultGroup(group.token, group.name, null)
}
@@ -873,15 +879,13 @@ object OpenGroupApi {
}
groups.map { group ->
val image = try {
images[group.token]!!.get()
images[group.token]!!.await()
} catch (e: Exception) {
// No image or image failed to download
null
}
DefaultGroup(group.token, group.name, image)
}
}.success { new ->
defaultRooms.tryEmit(new)
}.also(defaultRooms::tryEmit)
}
}

View File

@@ -3,6 +3,7 @@
package org.session.libsession.messaging.sending_receiving
import com.google.protobuf.ByteString
import kotlinx.coroutines.GlobalScope
import nl.komponents.kovenant.Promise
import nl.komponents.kovenant.deferred
import org.session.libsession.messaging.MessagingModuleConfiguration
@@ -14,6 +15,8 @@ import org.session.libsession.messaging.sending_receiving.MessageSender.Error
import org.session.libsession.messaging.sending_receiving.notifications.PushRegistryV1
import org.session.libsession.messaging.sending_receiving.pollers.LegacyClosedGroupPollerV2
import org.session.libsession.snode.SnodeAPI
import org.session.libsession.snode.utilities.asyncPromise
import org.session.libsession.snode.utilities.await
import org.session.libsession.utilities.Address
import org.session.libsession.utilities.Address.Companion.fromSerialized
import org.session.libsession.utilities.Device
@@ -41,10 +44,8 @@ fun MessageSender.create(
name: String,
members: Collection<String>
): Promise<String, Exception> {
val deferred = deferred<String, Exception>()
ThreadUtils.queue {
return GlobalScope.asyncPromise {
// Prepare
val context = MessagingModuleConfiguration.shared.context
val storage = MessagingModuleConfiguration.shared.storage
val userPublicKey = storage.getUserPublicKey()!!
val membersAsData = members.map { ByteString.copyFrom(Hex.fromStringCondensed(it)) }
@@ -83,7 +84,7 @@ fun MessageSender.create(
val closedGroupControlMessage = ClosedGroupControlMessage(closedGroupUpdateKind, groupID)
closedGroupControlMessage.sentTimestamp = sentTime
try {
sendNonDurably(closedGroupControlMessage, Address.fromSerialized(member), member == ourPubKey).get()
sendNonDurably(closedGroupControlMessage, Address.fromSerialized(member), member == ourPubKey).await()
} catch (e: Exception) {
// We failed to properly create the group so delete it's associated data (in the past
// we didn't create this data until the messages successfully sent but this resulted
@@ -91,8 +92,7 @@ fun MessageSender.create(
storage.removeClosedGroupPublicKey(groupPublicKey)
storage.removeAllClosedGroupEncryptionKeyPairs(groupPublicKey)
storage.deleteConversation(threadID)
deferred.reject(e)
return@queue
throw e
}
}
@@ -102,11 +102,8 @@ fun MessageSender.create(
PushRegistryV1.register(device = device, publicKey = userPublicKey)
// Start polling
LegacyClosedGroupPollerV2.shared.startPolling(groupPublicKey)
// Fulfill the promise
deferred.resolve(groupID)
groupID
}
// Return
return deferred.promise
}
fun MessageSender.setName(groupPublicKey: String, newName: String) {

View File

@@ -1,5 +1,6 @@
package org.session.libsession.messaging.sending_receiving.pollers
import kotlinx.coroutines.GlobalScope
import nl.komponents.kovenant.Promise
import nl.komponents.kovenant.functional.bind
import nl.komponents.kovenant.functional.map
@@ -9,6 +10,8 @@ import org.session.libsession.messaging.jobs.BatchMessageReceiveJob
import org.session.libsession.messaging.jobs.JobQueue
import org.session.libsession.messaging.jobs.MessageReceiveParameters
import org.session.libsession.snode.SnodeAPI
import org.session.libsession.snode.utilities.asyncPromise
import org.session.libsession.snode.utilities.await
import org.session.libsession.utilities.GroupUtil
import org.session.libsignal.crypto.secureRandomOrNull
import org.session.libsignal.utilities.Log
@@ -110,13 +113,13 @@ class LegacyClosedGroupPollerV2 {
when {
currentForkInfo.defaultRequiresAuth() -> SnodeAPI.getUnauthenticatedRawMessages(snode, groupPublicKey, namespace = Namespace.UNAUTHENTICATED_CLOSED_GROUP())
.map { SnodeAPI.parseRawMessagesResponse(it, snode, groupPublicKey, Namespace.UNAUTHENTICATED_CLOSED_GROUP()) }
currentForkInfo.hasNamespaces() -> task {
currentForkInfo.hasNamespaces() -> GlobalScope.asyncPromise {
val unAuthed = SnodeAPI.getUnauthenticatedRawMessages(snode, groupPublicKey, namespace = Namespace.UNAUTHENTICATED_CLOSED_GROUP())
.map { SnodeAPI.parseRawMessagesResponse(it, snode, groupPublicKey, Namespace.UNAUTHENTICATED_CLOSED_GROUP()) }
val default = SnodeAPI.getUnauthenticatedRawMessages(snode, groupPublicKey, namespace = Namespace.DEFAULT())
.map { SnodeAPI.parseRawMessagesResponse(it, snode, groupPublicKey, Namespace.DEFAULT()) }
val unAuthedResult = unAuthed.get()
val defaultResult = default.get()
val unAuthedResult = unAuthed.await()
val defaultResult = default.await()
val format = DateFormat.getTimeInstance()
if (unAuthedResult.isNotEmpty() || defaultResult.isNotEmpty()) {
Log.d("Poller", "@${format.format(Date())}Polled ${unAuthedResult.size} from -10, ${defaultResult.size} from 0")

View File

@@ -25,12 +25,12 @@ import org.session.libsession.messaging.sending_receiving.MessageReceiver
import org.session.libsession.messaging.sending_receiving.handle
import org.session.libsession.messaging.sending_receiving.handleOpenGroupReactions
import org.session.libsession.snode.OnionRequestAPI
import org.session.libsession.snode.utilities.successBackground
import org.session.libsession.utilities.Address
import org.session.libsession.utilities.GroupUtil
import org.session.libsignal.protos.SignalServiceProtos
import org.session.libsignal.utilities.Base64
import org.session.libsignal.utilities.Log
import org.session.libsignal.utilities.successBackground
import java.util.UUID
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.ScheduledFuture

View File

@@ -2,14 +2,12 @@ package org.session.libsession.messaging.sending_receiving.pollers
import android.util.SparseArray
import androidx.core.util.valueIterator
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.GlobalScope
import nl.komponents.kovenant.Deferred
import nl.komponents.kovenant.Promise
import nl.komponents.kovenant.deferred
import nl.komponents.kovenant.functional.bind
import nl.komponents.kovenant.resolve
import nl.komponents.kovenant.task
import org.session.libsession.database.StorageProtocol
import org.session.libsession.database.userAuth
import org.session.libsession.messaging.MessagingModuleConfiguration
@@ -19,6 +17,7 @@ import org.session.libsession.messaging.jobs.MessageReceiveParameters
import org.session.libsession.snode.RawResponse
import org.session.libsession.snode.SnodeAPI
import org.session.libsession.snode.SnodeModule
import org.session.libsession.snode.utilities.asyncPromise
import org.session.libsession.utilities.ConfigFactoryProtocol
import org.session.libsession.utilities.ConfigMessage
import org.session.libsession.utilities.UserConfigType
@@ -177,7 +176,7 @@ class Poller(
return poll(snode, deferred)
}
private fun pollUserProfile(snode: Snode, deferred: Deferred<Unit, Exception>): Promise<Unit, Exception> = task {
private fun pollUserProfile(snode: Snode, deferred: Deferred<Unit, Exception>): Promise<Unit, Exception> = GlobalScope.asyncPromise {
val requests = mutableListOf<SnodeAPI.SnodeBatchRequestInfo>()
val hashesToExtend = mutableSetOf<String>()
val userAuth = requireNotNull(MessagingModuleConfiguration.shared.storage.userAuth)
@@ -236,116 +235,114 @@ class Poller(
private fun poll(snode: Snode, deferred: Deferred<Unit, Exception>): Promise<Unit, Exception> {
if (!hasStarted) { return Promise.ofFail(PromiseCanceledException()) }
return task {
runBlocking(Dispatchers.IO) {
val userAuth = requireNotNull(MessagingModuleConfiguration.shared.storage.userAuth)
val requestSparseArray = SparseArray<SnodeAPI.SnodeBatchRequestInfo>()
// get messages
SnodeAPI.buildAuthenticatedRetrieveBatchRequest(
lastHash = lokiApiDatabase.getLastMessageHashValue(
snode = snode,
publicKey = userAuth.accountId.hexString,
namespace = Namespace.DEFAULT()
),
auth = userAuth,
maxSize = -2)
.also { personalMessages ->
// namespaces here should always be set
requestSparseArray[personalMessages.namespace!!] = personalMessages
}
// get the latest convo info volatile
val hashesToExtend = mutableSetOf<String>()
configFactory.withUserConfigs { configs ->
UserConfigType
.entries
.map { type ->
val config = configs.getConfig(type)
hashesToExtend += config.currentHashes()
type.namespace to SnodeAPI.buildAuthenticatedRetrieveBatchRequest(
lastHash = lokiApiDatabase.getLastMessageHashValue(
snode = snode,
publicKey = userAuth.accountId.hexString,
namespace = type.namespace
),
auth = userAuth,
namespace = type.namespace,
maxSize = -8
)
}
}.forEach { (namespace, request) ->
// namespaces here should always be set
requestSparseArray[namespace] = request
}
val requests =
requestSparseArray.valueIterator().asSequence().toMutableList()
if (hashesToExtend.isNotEmpty()) {
SnodeAPI.buildAuthenticatedAlterTtlBatchRequest(
messageHashes = hashesToExtend.toList(),
auth = userAuth,
newExpiry = SnodeAPI.nowWithOffset + 14.days.inWholeMilliseconds,
extend = true
).let { extensionRequest ->
requests += extensionRequest
return GlobalScope.asyncPromise {
val userAuth = requireNotNull(MessagingModuleConfiguration.shared.storage.userAuth)
val requestSparseArray = SparseArray<SnodeAPI.SnodeBatchRequestInfo>()
// get messages
SnodeAPI.buildAuthenticatedRetrieveBatchRequest(
lastHash = lokiApiDatabase.getLastMessageHashValue(
snode = snode,
publicKey = userAuth.accountId.hexString,
namespace = Namespace.DEFAULT()
),
auth = userAuth,
maxSize = -2)
.also { personalMessages ->
// namespaces here should always be set
requestSparseArray[personalMessages.namespace!!] = personalMessages
}
// get the latest convo info volatile
val hashesToExtend = mutableSetOf<String>()
configFactory.withUserConfigs { configs ->
UserConfigType
.entries
.map { type ->
val config = configs.getConfig(type)
hashesToExtend += config.currentHashes()
type.namespace to SnodeAPI.buildAuthenticatedRetrieveBatchRequest(
lastHash = lokiApiDatabase.getLastMessageHashValue(
snode = snode,
publicKey = userAuth.accountId.hexString,
namespace = type.namespace
),
auth = userAuth,
namespace = type.namespace,
maxSize = -8
)
}
}.forEach { (namespace, request) ->
// namespaces here should always be set
requestSparseArray[namespace] = request
}
val requests =
requestSparseArray.valueIterator().asSequence().toMutableList()
if (hashesToExtend.isNotEmpty()) {
SnodeAPI.buildAuthenticatedAlterTtlBatchRequest(
messageHashes = hashesToExtend.toList(),
auth = userAuth,
newExpiry = SnodeAPI.nowWithOffset + 14.days.inWholeMilliseconds,
extend = true
).let { extensionRequest ->
requests += extensionRequest
}
}
if (requests.isNotEmpty()) {
SnodeAPI.getRawBatchResponse(snode, userPublicKey, requests).bind { rawResponses ->
isCaughtUp = true
if (deferred.promise.isDone()) {
return@bind Promise.ofSuccess(Unit)
} else {
val responseList = (rawResponses["results"] as List<RawResponse>)
// in case we had null configs, the array won't be fully populated
// index of the sparse array key iterator should be the request index, with the key being the namespace
UserConfigType.entries
.map { type -> type to requestSparseArray.indexOfKey(type.namespace) }
.filter { (_, i) -> i >= 0 }
.forEach { (configType, requestIndex) ->
responseList.getOrNull(requestIndex)?.let { rawResponse ->
if (rawResponse["code"] as? Int != 200) {
Log.e(TAG, "Batch sub-request had non-200 response code, returned code ${(rawResponse["code"] as? Int) ?: "[unknown]"}")
return@forEach
}
val body = rawResponse["body"] as? RawResponse
if (body == null) {
Log.e(TAG, "Batch sub-request didn't contain a body")
return@forEach
}
processConfig(snode, body, configType)
}
}
// the first response will be the personal messages (we want these to be processed after config messages)
val personalResponseIndex = requestSparseArray.indexOfKey(Namespace.DEFAULT())
if (personalResponseIndex >= 0) {
responseList.getOrNull(personalResponseIndex)?.let { rawResponse ->
if (requests.isNotEmpty()) {
SnodeAPI.getRawBatchResponse(snode, userPublicKey, requests).bind { rawResponses ->
isCaughtUp = true
if (deferred.promise.isDone()) {
return@bind Promise.ofSuccess(Unit)
} else {
val responseList = (rawResponses["results"] as List<RawResponse>)
// in case we had null configs, the array won't be fully populated
// index of the sparse array key iterator should be the request index, with the key being the namespace
UserConfigType.entries
.map { type -> type to requestSparseArray.indexOfKey(type.namespace) }
.filter { (_, i) -> i >= 0 }
.forEach { (configType, requestIndex) ->
responseList.getOrNull(requestIndex)?.let { rawResponse ->
if (rawResponse["code"] as? Int != 200) {
Log.e(TAG, "Batch sub-request for personal messages had non-200 response code, returned code ${(rawResponse["code"] as? Int) ?: "[unknown]"}")
// If we got a non-success response then the snode might be bad so we should try rotate
// to a different one just in case
pollNextSnode(deferred = deferred)
return@bind Promise.ofSuccess(Unit)
} else {
val body = rawResponse["body"] as? RawResponse
if (body == null) {
Log.e(TAG, "Batch sub-request for personal messages didn't contain a body")
} else {
processPersonalMessages(snode, body)
}
Log.e(TAG, "Batch sub-request had non-200 response code, returned code ${(rawResponse["code"] as? Int) ?: "[unknown]"}")
return@forEach
}
val body = rawResponse["body"] as? RawResponse
if (body == null) {
Log.e(TAG, "Batch sub-request didn't contain a body")
return@forEach
}
processConfig(snode, body, configType)
}
}
poll(snode, deferred)
// the first response will be the personal messages (we want these to be processed after config messages)
val personalResponseIndex = requestSparseArray.indexOfKey(Namespace.DEFAULT())
if (personalResponseIndex >= 0) {
responseList.getOrNull(personalResponseIndex)?.let { rawResponse ->
if (rawResponse["code"] as? Int != 200) {
Log.e(TAG, "Batch sub-request for personal messages had non-200 response code, returned code ${(rawResponse["code"] as? Int) ?: "[unknown]"}")
// If we got a non-success response then the snode might be bad so we should try rotate
// to a different one just in case
pollNextSnode(deferred = deferred)
return@bind Promise.ofSuccess(Unit)
} else {
val body = rawResponse["body"] as? RawResponse
if (body == null) {
Log.e(TAG, "Batch sub-request for personal messages didn't contain a body")
} else {
processPersonalMessages(snode, body)
}
}
}
}
}.fail {
Log.e(TAG, "Failed to get raw batch response", it)
poll(snode, deferred)
}
}.fail {
Log.e(TAG, "Failed to get raw batch response", it)
poll(snode, deferred)
}
}
}

View File

@@ -1,5 +1,8 @@
package org.session.libsession.snode
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
import nl.komponents.kovenant.Deferred
import nl.komponents.kovenant.Promise
import nl.komponents.kovenant.all
@@ -8,6 +11,7 @@ import nl.komponents.kovenant.functional.bind
import nl.komponents.kovenant.functional.map
import okhttp3.Request
import org.session.libsession.messaging.file_server.FileServerApi
import org.session.libsession.snode.utilities.asyncPromise
import org.session.libsession.utilities.AESGCM
import org.session.libsession.utilities.AESGCM.EncryptionResult
import org.session.libsession.utilities.getBodyForOnionRequest
@@ -27,6 +31,7 @@ import org.session.libsignal.utilities.recover
import org.session.libsignal.utilities.toHexString
import java.util.concurrent.atomic.AtomicReference
import kotlin.collections.set
import kotlin.coroutines.EmptyCoroutineContext
private typealias Path = List<Snode>
@@ -112,26 +117,14 @@ object OnionRequestAPI {
* Tests the given snode. The returned promise errors out if the snode is faulty; the promise is fulfilled otherwise.
*/
private fun testSnode(snode: Snode): Promise<Unit, Exception> {
val deferred = deferred<Unit, Exception>()
ThreadUtils.queue { // No need to block the shared context for this
return GlobalScope.asyncPromise { // No need to block the shared context for this
val url = "${snode.address}:${snode.port}/get_stats/v1"
try {
val response = HTTP.execute(HTTP.Verb.GET, url, 3).decodeToString()
val json = JsonUtil.fromJson(response, Map::class.java)
val version = json["version"] as? String
if (version == null) { deferred.reject(Exception("Missing snode version.")); return@queue }
if (version >= "2.0.7") {
deferred.resolve(Unit)
} else {
val message = "Unsupported snode version: $version."
Log.d("Loki", message)
deferred.reject(Exception(message))
}
} catch (exception: Exception) {
deferred.reject(exception)
}
val response = HTTP.execute(HTTP.Verb.GET, url, 3).decodeToString()
val json = JsonUtil.fromJson(response, Map::class.java)
val version = json["version"] as? String
require(version != null) { "Missing snode version." }
require(version >= "2.0.7") { "Unsupported snode version: $version." }
}
return deferred.promise
}
/**
@@ -359,7 +352,7 @@ object OnionRequestAPI {
return@success deferred.reject(exception)
}
val destinationSymmetricKey = result.destinationSymmetricKey
ThreadUtils.queue {
GlobalScope.launch {
try {
val response = HTTP.execute(HTTP.Verb.POST, url, body)
handleResponse(response, destinationSymmetricKey, destination, version, deferred)

View File

@@ -1,8 +1,10 @@
package org.session.libsession.snode
import kotlinx.coroutines.GlobalScope
import nl.komponents.kovenant.Promise
import nl.komponents.kovenant.deferred
import org.session.libsession.snode.OnionRequestAPI.Destination
import org.session.libsession.snode.utilities.asyncPromise
import org.session.libsession.utilities.AESGCM
import org.session.libsession.utilities.AESGCM.EncryptionResult
import org.session.libsignal.utilities.toHexString
@@ -37,68 +39,56 @@ object OnionRequestEncryption {
destination: Destination,
version: Version
): Promise<EncryptionResult, Exception> {
val deferred = deferred<EncryptionResult, Exception>()
ThreadUtils.queue {
try {
val plaintext = if (version == Version.V4) {
payload
} else {
// Wrapping isn't needed for file server or open group onion requests
when (destination) {
is Destination.Snode -> encode(payload, mapOf("headers" to ""))
is Destination.Server -> payload
}
return GlobalScope.asyncPromise {
val plaintext = if (version == Version.V4) {
payload
} else {
// Wrapping isn't needed for file server or open group onion requests
when (destination) {
is Destination.Snode -> encode(payload, mapOf("headers" to ""))
is Destination.Server -> payload
}
val x25519PublicKey = when (destination) {
is Destination.Snode -> destination.snode.publicKeySet!!.x25519Key
is Destination.Server -> destination.x25519PublicKey
}
val result = AESGCM.encrypt(plaintext, x25519PublicKey)
deferred.resolve(result)
} catch (exception: Exception) {
deferred.reject(exception)
}
val x25519PublicKey = when (destination) {
is Destination.Snode -> destination.snode.publicKeySet!!.x25519Key
is Destination.Server -> destination.x25519PublicKey
}
AESGCM.encrypt(plaintext, x25519PublicKey)
}
return deferred.promise
}
/**
* Encrypts the previous encryption result (i.e. that of the hop after this one) for this hop. Use this to build the layers of an onion request.
*/
internal fun encryptHop(lhs: Destination, rhs: Destination, previousEncryptionResult: EncryptionResult): Promise<EncryptionResult, Exception> {
val deferred = deferred<EncryptionResult, Exception>()
ThreadUtils.queue {
try {
val payload: MutableMap<String, Any> = when (rhs) {
is Destination.Snode -> {
mutableMapOf( "destination" to rhs.snode.publicKeySet!!.ed25519Key )
}
is Destination.Server -> {
mutableMapOf(
"host" to rhs.host,
"target" to rhs.target,
"method" to "POST",
"protocol" to rhs.scheme,
"port" to rhs.port
)
}
return GlobalScope.asyncPromise {
val payload: MutableMap<String, Any> = when (rhs) {
is Destination.Snode -> {
mutableMapOf("destination" to rhs.snode.publicKeySet!!.ed25519Key)
}
payload["ephemeral_key"] = previousEncryptionResult.ephemeralPublicKey.toHexString()
val x25519PublicKey = when (lhs) {
is Destination.Snode -> {
lhs.snode.publicKeySet!!.x25519Key
}
is Destination.Server -> {
lhs.x25519PublicKey
}
is Destination.Server -> {
mutableMapOf(
"host" to rhs.host,
"target" to rhs.target,
"method" to "POST",
"protocol" to rhs.scheme,
"port" to rhs.port
)
}
val plaintext = encode(previousEncryptionResult.ciphertext, payload)
val result = AESGCM.encrypt(plaintext, x25519PublicKey)
deferred.resolve(result)
} catch (exception: Exception) {
deferred.reject(exception)
}
payload["ephemeral_key"] = previousEncryptionResult.ephemeralPublicKey.toHexString()
val x25519PublicKey = when (lhs) {
is Destination.Snode -> {
lhs.snode.publicKeySet!!.x25519Key
}
is Destination.Server -> {
lhs.x25519PublicKey
}
}
val plaintext = encode(previousEncryptionResult.ciphertext, payload)
AESGCM.encrypt(plaintext, x25519PublicKey)
}
return deferred.promise
}
}

View File

@@ -9,6 +9,7 @@ import com.goterl.lazysodium.interfaces.GenericHash
import com.goterl.lazysodium.interfaces.PwHash
import com.goterl.lazysodium.interfaces.SecretBox
import com.goterl.lazysodium.utils.Key
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope
@@ -27,6 +28,7 @@ import nl.komponents.kovenant.unwrap
import org.session.libsession.messaging.utilities.MessageWrapper
import org.session.libsession.messaging.utilities.SodiumUtilities.sodium
import org.session.libsession.snode.model.BatchResponse
import org.session.libsession.snode.utilities.asyncPromise
import org.session.libsession.snode.utilities.await
import org.session.libsession.snode.utilities.retrySuspendAsPromise
import org.session.libsession.utilities.mapValuesNotNull
@@ -137,7 +139,7 @@ object SnodeAPI {
JsonUtil.fromJson(it.body ?: throw Error.Generic, Map::class.java)
}
else -> task {
else -> GlobalScope.asyncPromise {
HTTP.execute(
HTTP.Verb.POST,
url = "${snode.address}:${snode.port}/storage_rpc/v1",
@@ -169,17 +171,15 @@ object SnodeAPI {
JsonUtil.fromJson(resp.body ?: throw Error.Generic, responseClass)
}
else -> withContext(Dispatchers.IO) {
HTTP.execute(
HTTP.Verb.POST,
url = "${snode.address}:${snode.port}/storage_rpc/v1",
parameters = buildMap {
this["method"] = method.rawValue
this["params"] = parameters
}
).toString().let {
JsonUtil.fromJson(it, responseClass)
else -> HTTP.execute(
HTTP.Verb.POST,
url = "${snode.address}:${snode.port}/storage_rpc/v1",
parameters = buildMap {
this["method"] = method.rawValue
this["params"] = parameters
}
).toString().let {
JsonUtil.fromJson(it, responseClass)
}
}
@@ -192,7 +192,7 @@ object SnodeAPI {
}
internal fun getRandomSnode(): Promise<Snode, Exception> =
snodePool.takeIf { it.size >= minimumSnodePoolCount }?.secureRandom()?.let { Promise.of(it) } ?: task {
snodePool.takeIf { it.size >= minimumSnodePoolCount }?.secureRandom()?.let { Promise.of(it) } ?: GlobalScope.asyncPromise {
val target = seedNodePool.random()
Log.d("Loki", "Populating snode pool using: $target.")
val url = "$target/json_rpc"
@@ -241,7 +241,7 @@ object SnodeAPI {
}
// Public API
fun getAccountID(onsName: String): Promise<String, Exception> = task {
fun getAccountID(onsName: String): Promise<String, Exception> = GlobalScope.asyncPromise {
val validationCount = 3
val accountIDByteCount = 33
// Hash the ONS name using BLAKE2b
@@ -630,11 +630,16 @@ object SnodeAPI {
getBatchResponse(
snode = snode,
publicKey = batch.first().publicKey,
requests = batch.map { it.request }, sequence = false
requests = batch.mapNotNull { info ->
info.request.takeIf { !info.callback.isClosedForSend }
},
sequence = false
)
} catch (e: Exception) {
for (req in batch) {
req.callback.send(Result.failure(e))
runCatching {
req.callback.send(Result.failure(e))
}
}
return@batch
}
@@ -650,7 +655,9 @@ object SnodeAPI {
JsonUtil.fromJson(resp.body, req.responseType)
}
req.callback.send(result)
runCatching{
req.callback.send(result)
}
}
// Close all channels in the requests just in case we don't have paired up
@@ -673,7 +680,14 @@ object SnodeAPI {
val callback = Channel<Result<T>>()
@Suppress("UNCHECKED_CAST")
batchedRequestsSender.send(RequestInfo(snode, publicKey, request, responseType, callback as SendChannel<Any>))
return callback.receive().getOrThrow()
try {
return callback.receive().getOrThrow()
} catch (e: CancellationException) {
// Close the channel if the coroutine is cancelled, so the batch processing won't
// handle this one (best effort only)
callback.close()
throw e
}
}
suspend fun sendBatchRequest(

View File

@@ -2,10 +2,14 @@ package org.session.libsession.snode.utilities
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import nl.komponents.kovenant.Promise
import nl.komponents.kovenant.deferred
import org.session.libsignal.utilities.Log
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
import kotlin.coroutines.suspendCoroutine
@@ -17,9 +21,20 @@ suspend fun <T, E: Throwable> Promise<T, E>.await(): T {
}
}
fun <T> CoroutineScope.asyncPromise(block: suspend () -> T): Promise<T, Exception> {
fun <V, E: Throwable> Promise<V, E>.successBackground(callback: (value: V) -> Unit): Promise<V, E> {
GlobalScope.launch {
try {
callback(this@successBackground.await())
} catch (e: Exception) {
Log.d("Loki", "Failed to execute task in background: ${e.message}.")
}
}
return this
}
fun <T> CoroutineScope.asyncPromise(context: CoroutineContext = EmptyCoroutineContext, block: suspend () -> T): Promise<T, Exception> {
val defer = deferred<T, Exception>()
launch {
launch(context) {
try {
defer.resolve(block())
} catch (e: Exception) {

View File

@@ -1,8 +1,11 @@
package org.session.libsession.utilities
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.withContext
import okhttp3.HttpUrl
import okhttp3.HttpUrl.Companion.toHttpUrlOrNull
import org.session.libsession.messaging.file_server.FileServerApi
import org.session.libsession.snode.utilities.await
import org.session.libsignal.utilities.HTTP
import org.session.libsignal.utilities.Log
import java.io.File
@@ -14,8 +17,7 @@ object DownloadUtilities {
/**
* Blocks the calling thread.
*/
@JvmStatic
fun downloadFile(destination: File, url: String) {
suspend fun downloadFile(destination: File, url: String) {
val outputStream = FileOutputStream(destination) // Throws
var remainingAttempts = 2
var exception: Exception? = null
@@ -35,13 +37,13 @@ object DownloadUtilities {
/**
* Blocks the calling thread.
*/
@JvmStatic
fun downloadFile(outputStream: OutputStream, urlAsString: String) {
suspend fun downloadFile(outputStream: OutputStream, urlAsString: String) {
val url = urlAsString.toHttpUrlOrNull()!!
val fileID = url.pathSegments.last()
try {
FileServerApi.download(fileID).get().let {
outputStream.write(it)
val data = FileServerApi.download(fileID).await()
withContext(Dispatchers.IO) {
outputStream.write(data)
}
} catch (e: Exception) {
when (e) {