Merge branch 'dev' of https://github.com/loki-project/session-android into zombie-handling-update

This commit is contained in:
Brice-W
2021-06-03 11:51:57 +10:00
66 changed files with 3486 additions and 1085 deletions

View File

@@ -12,8 +12,8 @@ import org.session.libsession.messaging.messages.visible.Attachment
import org.session.libsession.messaging.messages.visible.VisibleMessage
import org.session.libsession.messaging.open_groups.OpenGroupV2
import org.session.libsession.messaging.sending_receiving.attachments.AttachmentId
import org.session.libsession.messaging.sending_receiving.data_extraction.DataExtractionNotificationInfoMessage
import org.session.libsession.messaging.sending_receiving.attachments.DatabaseAttachment
import org.session.libsession.messaging.sending_receiving.data_extraction.DataExtractionNotificationInfoMessage
import org.session.libsession.messaging.sending_receiving.link_preview.LinkPreview
import org.session.libsession.messaging.sending_receiving.quotes.QuoteModel
import org.session.libsession.utilities.Address
@@ -130,6 +130,7 @@ interface StorageProtocol {
fun getThreadId(recipient: Recipient): Long?
fun getThreadIdForMms(mmsId: Long): Long
fun getLastUpdated(threadID: Long): Long
fun trimThread(threadID: Long, threadLimit: Int)
// Contacts
fun getContactWithSessionID(sessionID: String): Contact?

View File

@@ -37,9 +37,9 @@ class JobQueue : JobDelegate {
init {
// Process jobs
scope.launch {
val rxQueue = Channel<Job>(capacity = 1024)
val txQueue = Channel<Job>(capacity = 1024)
val attachmentQueue = Channel<Job>(capacity = 1024)
val rxQueue = Channel<Job>(capacity = 4096)
val txQueue = Channel<Job>(capacity = 4096)
val attachmentQueue = Channel<Job>(capacity = 4096)
val receiveJob = processWithDispatcher(rxQueue, rxDispatcher)
val txJob = processWithDispatcher(txQueue, txDispatcher)
@@ -50,7 +50,7 @@ class JobQueue : JobDelegate {
when (job) {
is NotifyPNServerJob, is AttachmentUploadJob, is MessageSendJob -> txQueue.send(job)
is AttachmentDownloadJob -> attachmentQueue.send(job)
is MessageReceiveJob -> rxQueue.send(job)
is MessageReceiveJob, is TrimThreadJob -> rxQueue.send(job)
else -> throw IllegalStateException("Unexpected job type.")
}
}

View File

@@ -10,7 +10,8 @@ class SessionJobManagerFactories {
AttachmentUploadJob.KEY to AttachmentUploadJob.Factory(),
MessageReceiveJob.KEY to MessageReceiveJob.Factory(),
MessageSendJob.KEY to MessageSendJob.Factory(),
NotifyPNServerJob.KEY to NotifyPNServerJob.Factory()
NotifyPNServerJob.KEY to NotifyPNServerJob.Factory(),
TrimThreadJob.KEY to TrimThreadJob.Factory()
)
}
}

View File

@@ -0,0 +1,44 @@
package org.session.libsession.messaging.jobs
import org.session.libsession.messaging.MessagingModuleConfiguration
import org.session.libsession.messaging.utilities.Data
import org.session.libsession.utilities.TextSecurePreferences
class TrimThreadJob(val threadId: Long) : Job {
override var delegate: JobDelegate? = null
override var id: String? = null
override var failureCount: Int = 0
override val maxFailureCount: Int = 1
companion object {
const val KEY: String = "TrimThreadJob"
const val THREAD_ID = "thread_id"
}
override fun execute() {
val context = MessagingModuleConfiguration.shared.context
val trimmingEnabled = TextSecurePreferences.isThreadLengthTrimmingEnabled(context)
val threadLengthLimit = TextSecurePreferences.getThreadTrimLength(context)
if (trimmingEnabled) {
MessagingModuleConfiguration.shared.storage.trimThread(threadId, threadLengthLimit)
}
delegate?.handleJobSucceeded(this)
}
override fun serialize(): Data {
return Data.Builder()
.putLong(THREAD_ID, threadId)
.build()
}
override fun getFactoryKey(): String = "TrimThreadJob"
class Factory : Job.Factory<TrimThreadJob> {
override fun create(data: Data): TrimThreadJob {
return TrimThreadJob(data.getLong(THREAD_ID))
}
}
}

View File

@@ -1,5 +1,6 @@
package org.session.libsession.messaging.open_groups
import com.fasterxml.jackson.annotation.JsonProperty
import com.fasterxml.jackson.databind.PropertyNamingStrategy
import com.fasterxml.jackson.databind.annotation.JsonNaming
import com.fasterxml.jackson.databind.type.TypeFactory
@@ -17,14 +18,9 @@ import org.session.libsession.messaging.sending_receiving.pollers.OpenGroupPolle
import org.session.libsession.snode.OnionRequestAPI
import org.session.libsession.utilities.AESGCM
import org.session.libsession.utilities.TextSecurePreferences
import org.session.libsignal.utilities.HTTP
import org.session.libsignal.utilities.HTTP.Verb.*
import org.session.libsignal.utilities.removing05PrefixIfNeeded
import org.session.libsignal.utilities.toHexString
import org.session.libsignal.utilities.*
import org.session.libsignal.utilities.Base64.*
import org.session.libsignal.utilities.Hex
import org.session.libsignal.utilities.JsonUtil
import org.session.libsignal.utilities.Log
import org.session.libsignal.utilities.HTTP.Verb.*
import org.whispersystems.curve25519.Curve25519
import java.util.*
@@ -63,11 +59,13 @@ object OpenGroupAPIV2 {
@JsonNaming(PropertyNamingStrategy.SnakeCaseStrategy::class)
data class CompactPollRequest(val roomID: String, val authToken: String, val fromDeletionServerID: Long?, val fromMessageServerID: Long?)
data class CompactPollResult(val messages: List<OpenGroupMessageV2>, val deletions: List<Long>, val moderators: List<String>)
data class CompactPollResult(val messages: List<OpenGroupMessageV2>, val deletions: List<MessageDeletion>, val moderators: List<String>)
@JsonNaming(PropertyNamingStrategy.SnakeCaseStrategy::class)
data class MessageDeletion
@JvmOverloads constructor(val id: Long = 0, val deletedMessageId: Long = 0
data class MessageDeletion(
@JsonProperty("id")
val id: Long = 0,
@JsonProperty("deleted_message_id")
val deletedMessageServerID: Long = 0
) {
companion object {
@@ -237,7 +235,10 @@ object OpenGroupAPIV2 {
return send(request).map { json ->
@Suppress("UNCHECKED_CAST") val rawMessage = json["message"] as? Map<String, Any>
?: throw Error.ParsingFailed
OpenGroupMessageV2.fromJSON(rawMessage) ?: throw Error.ParsingFailed
val result = OpenGroupMessageV2.fromJSON(rawMessage) ?: throw Error.ParsingFailed
val storage = MessagingModuleConfiguration.shared.storage
storage.addReceivedMessageTimestamp(result.sentTimestamp)
result
}
}
// endregion
@@ -258,9 +259,6 @@ object OpenGroupAPIV2 {
}
private fun parseMessages(room: String, server: String, rawMessages: List<Map<*, *>>): List<OpenGroupMessageV2> {
val storage = MessagingModuleConfiguration.shared.storage
val lastMessageServerID = storage.getLastMessageServerID(room, server) ?: 0
var currentLastMessageServerID = lastMessageServerID
val messages = rawMessages.mapNotNull { json ->
json as Map<String, Any>
try {
@@ -275,15 +273,11 @@ object OpenGroupAPIV2 {
Log.d("Loki", "Ignoring message with invalid signature.")
return@mapNotNull null
}
if (message.serverID > lastMessageServerID) {
currentLastMessageServerID = message.serverID
}
message
} catch (e: Exception) {
null
}
}
storage.setLastMessageServerID(room, server, currentLastMessageServerID)
return messages
}
// endregion
@@ -403,18 +397,13 @@ object OpenGroupAPIV2 {
// Deletions
val type = TypeFactory.defaultInstance().constructCollectionType(List::class.java, MessageDeletion::class.java)
val idsAsString = JsonUtil.toJson(json["deletions"])
val deletedServerIDs = JsonUtil.fromJson<List<MessageDeletion>>(idsAsString, type) ?: throw Error.ParsingFailed
val lastDeletionServerID = storage.getLastDeletionServerID(roomID, server) ?: 0
val serverID = deletedServerIDs.maxByOrNull { it.id } ?: MessageDeletion.empty
if (serverID.id > lastDeletionServerID) {
storage.setLastDeletionServerID(roomID, server, serverID.id)
}
val deletions = JsonUtil.fromJson<List<MessageDeletion>>(idsAsString, type) ?: throw Error.ParsingFailed
// Messages
val rawMessages = json["messages"] as? List<Map<String, Any>> ?: return@mapNotNull null
val messages = parseMessages(roomID, server, rawMessages)
roomID to CompactPollResult(
messages = messages,
deletions = deletedServerIDs.map { it.deletedMessageId },
deletions = deletions,
moderators = moderators
)
}.toMap()

View File

@@ -234,7 +234,7 @@ fun MessageReceiver.handleVisibleMessage(message: VisibleMessage, proto: SignalS
}
// Cancel any typing indicators if needed
cancelTypingIndicatorsIfNeeded(message.sender!!)
//Notify the user if needed
// Notify the user if needed
SSKEnvironment.shared.notificationManager.updateNotification(context, threadID)
}
//endregion

View File

@@ -5,6 +5,7 @@ import nl.komponents.kovenant.functional.map
import org.session.libsession.messaging.MessagingModuleConfiguration
import org.session.libsession.messaging.jobs.JobQueue
import org.session.libsession.messaging.jobs.MessageReceiveJob
import org.session.libsession.messaging.jobs.TrimThreadJob
import org.session.libsession.messaging.open_groups.OpenGroupAPIV2
import org.session.libsession.messaging.open_groups.OpenGroupMessageV2
import org.session.libsession.utilities.Address
@@ -15,6 +16,7 @@ import org.session.libsignal.utilities.successBackground
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.ScheduledFuture
import java.util.concurrent.TimeUnit
import kotlin.math.max
class OpenGroupPollerV2(private val server: String, private val executorService: ScheduledExecutorService?) {
var hasStarted = false
@@ -44,8 +46,8 @@ class OpenGroupPollerV2(private val server: String, private val executorService:
return OpenGroupAPIV2.compactPoll(rooms, server).successBackground { responses ->
responses.forEach { (room, response) ->
val openGroupID = "$server.$room"
handleNewMessages(openGroupID, response.messages, isBackgroundPoll)
handleDeletedMessages(openGroupID, response.deletions)
handleNewMessages(room, openGroupID, response.messages, isBackgroundPoll)
handleDeletedMessages(room, openGroupID, response.deletions)
if (secondToLastJob == null && !isCaughtUp) {
isCaughtUp = true
}
@@ -55,8 +57,13 @@ class OpenGroupPollerV2(private val server: String, private val executorService:
}.map { }
}
private fun handleNewMessages(openGroupID: String, messages: List<OpenGroupMessageV2>, isBackgroundPoll: Boolean) {
if (!hasStarted) { return }
private fun handleNewMessages(room: String, openGroupID: String, messages: List<OpenGroupMessageV2>, isBackgroundPoll: Boolean) {
val storage = MessagingModuleConfiguration.shared.storage
val groupID = GroupUtil.getEncodedOpenGroupID(openGroupID.toByteArray())
// check thread still exists
val threadId = storage.getThreadId(Address.fromSerialized(groupID)) ?: -1
val threadExists = threadId >= 0
if (!hasStarted || !threadExists) { return }
var latestJob: MessageReceiveJob? = null
messages.sortedBy { it.serverID!! }.forEach { message ->
try {
@@ -82,22 +89,33 @@ class OpenGroupPollerV2(private val server: String, private val executorService:
Log.e("Loki", "Exception parsing message", e)
}
}
val currentLastMessageServerID = storage.getLastMessageServerID(room, server) ?: 0
val actualMax = max(messages.mapNotNull { it.serverID }.maxOrNull() ?: 0, currentLastMessageServerID)
if (actualMax > 0) {
storage.setLastMessageServerID(room, server, actualMax)
}
JobQueue.shared.add(TrimThreadJob(threadId))
}
private fun handleDeletedMessages(openGroupID: String, deletedMessageServerIDs: List<Long>) {
private fun handleDeletedMessages(room: String, openGroupID: String, deletions: List<OpenGroupAPIV2.MessageDeletion>) {
val storage = MessagingModuleConfiguration.shared.storage
val dataProvider = MessagingModuleConfiguration.shared.messageDataProvider
val groupID = GroupUtil.getEncodedOpenGroupID(openGroupID.toByteArray())
val threadID = storage.getThreadId(Address.fromSerialized(groupID)) ?: return
val deletedMessageIDs = deletedMessageServerIDs.mapNotNull { serverID ->
val messageID = dataProvider.getMessageID(serverID, threadID)
val deletedMessageIDs = deletions.mapNotNull { deletion ->
val messageID = dataProvider.getMessageID(deletion.deletedMessageServerID, threadID)
if (messageID == null) {
Log.d("Loki", "Couldn't find message ID for message with serverID: $serverID.")
Log.d("Loki", "Couldn't find message ID for message with serverID: ${deletion.deletedMessageServerID}.")
}
messageID
}
deletedMessageIDs.forEach { (messageId, isSms) ->
MessagingModuleConfiguration.shared.messageDataProvider.deleteMessage(messageId, isSms)
}
val currentMax = storage.getLastDeletionServerID(room, server) ?: 0L
val latestMax = deletions.map { it.id }.maxOrNull() ?: 0L
if (latestMax > currentMax && latestMax != 0L) {
storage.setLastDeletionServerID(room, server, latestMax)
}
}
}

View File

@@ -430,7 +430,7 @@ object OnionRequestAPI {
/**
* Sends an onion request to `snode`. Builds new paths as needed.
*/
internal fun sendOnionRequest(method: Snode.Method, parameters: Map<*, *>, snode: Snode, publicKey: String): Promise<Map<*, *>, Exception> {
internal fun sendOnionRequest(method: Snode.Method, parameters: Map<*, *>, snode: Snode, publicKey: String? = null): Promise<Map<*, *>, Exception> {
val payload = mapOf( "method" to method.rawValue, "params" to parameters )
return sendOnionRequest(Destination.Snode(snode), payload).recover { exception ->
val httpRequestFailedException = exception as? HTTP.HTTPRequestFailedException

View File

@@ -3,24 +3,29 @@
package org.session.libsession.snode
import android.os.Build
import com.goterl.lazysodium.LazySodiumAndroid
import com.goterl.lazysodium.SodiumAndroid
import com.goterl.lazysodium.exceptions.SodiumException
import com.goterl.lazysodium.interfaces.AEAD
import com.goterl.lazysodium.interfaces.GenericHash
import com.goterl.lazysodium.interfaces.PwHash
import com.goterl.lazysodium.interfaces.SecretBox
import com.goterl.lazysodium.utils.Key
import nl.komponents.kovenant.*
import nl.komponents.kovenant.functional.bind
import nl.komponents.kovenant.functional.map
import org.session.libsession.messaging.utilities.MessageWrapper
import org.session.libsignal.crypto.getRandomElement
import org.session.libsignal.protos.SignalServiceProtos
import org.session.libsignal.utilities.Snode
import org.session.libsignal.utilities.HTTP
import org.session.libsignal.database.LokiAPIDatabaseProtocol
import org.session.libsignal.utilities.Broadcaster
import org.session.libsignal.utilities.prettifiedDescription
import org.session.libsignal.utilities.removing05PrefixIfNeeded
import org.session.libsignal.utilities.retryIfNeeded
import org.session.libsignal.protos.SignalServiceProtos
import org.session.libsignal.utilities.*
import org.session.libsignal.utilities.Log
import org.session.libsignal.utilities.Base64
import java.security.SecureRandom
import java.util.*
object SnodeAPI {
private val sodium by lazy { LazySodiumAndroid(SodiumAndroid()) }
private val database: LokiAPIDatabaseProtocol
get() = SnodeModule.shared.storage
private val broadcaster: Broadcaster
@@ -54,10 +59,14 @@ object SnodeAPI {
internal sealed class Error(val description: String) : Exception(description) {
object Generic : Error("An error occurred.")
object ClockOutOfSync : Error("Your clock is out of sync with the Service Node network.")
// ONS
object DecryptionFailed : Error("Couldn't decrypt ONS name.")
object HashingFailed : Error("Couldn't compute ONS name hash.")
object ValidationFailed : Error("ONS name validation failed.")
}
// Internal API
internal fun invoke(method: Snode.Method, snode: Snode, publicKey: String, parameters: Map<String, String>): RawResponsePromise {
internal fun invoke(method: Snode.Method, snode: Snode, publicKey: String? = null, parameters: Map<String, Any>): RawResponsePromise {
val url = "${snode.address}:${snode.port}/storage_rpc/v1"
if (useOnionRequests) {
return OnionRequestAPI.sendOnionRequest(method, parameters, snode, publicKey)
@@ -153,6 +162,91 @@ object SnodeAPI {
}
// Public API
fun getSessionIDFor(onsName: String): Promise<String, Exception> {
val deferred = deferred<String, Exception>()
val promise = deferred.promise
val validationCount = 3
val sessionIDByteCount = 33
// Hash the ONS name using BLAKE2b
val onsName = onsName.toLowerCase(Locale.ENGLISH)
val nameAsData = onsName.toByteArray()
val nameHash = ByteArray(GenericHash.BYTES)
if (!sodium.cryptoGenericHash(nameHash, nameHash.size, nameAsData, nameAsData.size.toLong())) {
deferred.reject(Error.HashingFailed)
return promise
}
val base64EncodedNameHash = Base64.encodeBytes(nameHash)
// Ask 3 different snodes for the Session ID associated with the given name hash
val parameters = mapOf(
"endpoint" to "ons_resolve",
"params" to mapOf( "type" to 0, "name_hash" to base64EncodedNameHash )
)
val promises = (1..validationCount).map {
getRandomSnode().bind { snode ->
retryIfNeeded(maxRetryCount) {
invoke(Snode.Method.OxenDaemonRPCCall, snode, null, parameters)
}
}
}
all(promises).success { results ->
val sessionIDs = mutableListOf<String>()
for (json in results) {
val intermediate = json["result"] as? Map<*, *>
val hexEncodedCiphertext = intermediate?.get("encrypted_value") as? String
if (hexEncodedCiphertext != null) {
val ciphertext = Hex.fromStringCondensed(hexEncodedCiphertext)
val isArgon2Based = (intermediate["nonce"] == null)
if (isArgon2Based) {
// Handle old Argon2-based encryption used before HF16
val salt = ByteArray(PwHash.SALTBYTES)
val key: ByteArray
val nonce = ByteArray(SecretBox.NONCEBYTES)
val sessionIDAsData = ByteArray(sessionIDByteCount)
try {
key = Key.fromHexString(sodium.cryptoPwHash(onsName, SecretBox.KEYBYTES, salt, PwHash.OPSLIMIT_MODERATE, PwHash.MEMLIMIT_MODERATE, PwHash.Alg.PWHASH_ALG_ARGON2ID13)).asBytes
} catch (e: SodiumException) {
deferred.reject(Error.HashingFailed)
return@success
}
if (!sodium.cryptoSecretBoxOpenEasy(sessionIDAsData, ciphertext, ciphertext.size.toLong(), nonce, key)) {
deferred.reject(Error.DecryptionFailed)
return@success
}
sessionIDs.add(Hex.toStringCondensed(sessionIDAsData))
} else {
val hexEncodedNonce = intermediate["nonce"] as? String
if (hexEncodedNonce == null) {
deferred.reject(Error.Generic)
return@success
}
val nonce = Hex.fromStringCondensed(hexEncodedNonce)
val key = ByteArray(GenericHash.BYTES)
if (!sodium.cryptoGenericHash(key, key.size, nameAsData, nameAsData.size.toLong(), nameHash, nameHash.size)) {
deferred.reject(Error.HashingFailed)
return@success
}
val sessionIDAsData = ByteArray(sessionIDByteCount)
if (!sodium.cryptoAeadXChaCha20Poly1305IetfDecrypt(sessionIDAsData, null, null, ciphertext, ciphertext.size.toLong(), null, 0, nonce, key)) {
deferred.reject(Error.DecryptionFailed)
return@success
}
sessionIDs.add(Hex.toStringCondensed(sessionIDAsData))
}
} else {
deferred.reject(Error.Generic)
return@success
}
}
if (sessionIDs.size == validationCount && sessionIDs.toSet().size == 1) {
deferred.resolve(sessionIDs.first())
} else {
deferred.reject(Error.ValidationFailed)
}
}
return promise
}
fun getTargetSnodes(publicKey: String): Promise<List<Snode>, Exception> {
// SecureRandom() should be cryptographically secure
return getSwarm(publicKey).map { it.shuffled(SecureRandom()).take(targetSwarmSnodeCount) }

View File

@@ -12,7 +12,6 @@ import android.os.Handler;
* See http://rxmarbles.com/#debounce
*/
public class Debouncer {
private final Handler handler;
private final long threshold;

View File

@@ -321,6 +321,7 @@ object Util {
@JvmStatic
fun getBoldedString(value: String?): CharSequence {
if (value.isNullOrEmpty()) { return "" }
val spanned = SpannableString(value)
spanned.setSpan(StyleSpan(Typeface.BOLD), 0,
spanned.length,