Config revamp

This commit is contained in:
SessionHero01
2024-10-01 17:20:50 +10:00
parent e9e67068cd
commit 45a66d0eea
29 changed files with 1083 additions and 963 deletions

View File

@@ -1,133 +1,255 @@
package org.session.libsession.messaging.configs
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.launch
import network.loki.messenger.libsession_util.MutableConfig
import network.loki.messenger.libsession_util.util.ConfigPush
import org.session.libsession.database.StorageProtocol
import org.session.libsession.database.userAuth
import org.session.libsession.snode.OwnedSwarmAuth
import org.session.libsession.snode.SnodeAPI
import org.session.libsession.snode.SnodeMessage
import org.session.libsession.snode.SwarmAuth
import org.session.libsession.snode.model.StoreMessageResponse
import org.session.libsession.snode.utilities.await
import org.session.libsession.utilities.ConfigFactoryProtocol
import org.session.libsession.utilities.ConfigPushResult
import org.session.libsession.utilities.ConfigUpdateNotification
import org.session.libsession.utilities.UserConfigType
import org.session.libsession.utilities.getClosedGroup
import org.session.libsignal.utilities.AccountId
import org.session.libsignal.utilities.Base64
import org.session.libsignal.utilities.Log
import org.session.libsignal.utilities.Namespace
import org.session.libsignal.utilities.Snode
import javax.inject.Inject
class ConfigSyncHandler(
private const val TAG = "ConfigSyncHandler"
/**
* This class is responsible for sending the local config changes to the swarm.
*
* It does so by listening for changes in the config factory.
*/
class ConfigSyncHandler @Inject constructor(
private val configFactory: ConfigFactoryProtocol,
private val storageProtocol: StorageProtocol,
@Suppress("OPT_IN_USAGE") scope: CoroutineScope = GlobalScope,
) {
init {
scope.launch {
private var job: Job? = null
@OptIn(ExperimentalCoroutinesApi::class, DelicateCoroutinesApi::class)
fun start() {
require(job == null) { "Already started" }
job = GlobalScope.launch {
val groupDispatchers = hashMapOf<AccountId, CoroutineDispatcher>()
val userConfigDispatcher = Dispatchers.Default.limitedParallelism(1)
configFactory.configUpdateNotifications.collect { changes ->
try {
when (changes) {
is ConfigUpdateNotification.GroupConfigsDeleted -> {}
is ConfigUpdateNotification.GroupConfigsUpdated -> {
pushGroupConfigsChangesIfNeeded(changes.groupId)
is ConfigUpdateNotification.GroupConfigsDeleted -> {
groupDispatchers.remove(changes.groupId)
}
is ConfigUpdateNotification.GroupConfigsUpdated -> {
// Group config pushing is limited to its own dispatcher
launch(groupDispatchers.getOrPut(changes.groupId) {
Dispatchers.Default.limitedParallelism(1)
}) {
pushGroupConfigsChangesIfNeeded(changes.groupId)
}
}
ConfigUpdateNotification.UserConfigs -> launch(userConfigDispatcher) {
pushUserConfigChangesIfNeeded()
}
ConfigUpdateNotification.UserConfigs -> pushUserConfigChangesIfNeeded()
}
} catch (e: Exception) {
Log.e("ConfigSyncHandler", "Error handling config update", e)
Log.e(TAG, "Error handling config update", e)
}
}
}
}
private suspend fun pushGroupConfigsChangesIfNeeded(groupId: AccountId): Unit = coroutineScope {
}
private suspend fun pushUserConfigChangesIfNeeded(): Unit = coroutineScope {
private suspend fun pushGroupConfigsChangesIfNeeded(groupId: AccountId) = coroutineScope {
// Only admin can push group configs
val adminKey = configFactory.getClosedGroup(groupId)?.adminKey
if (adminKey == null) {
Log.i(TAG, "Skipping group config push without admin key")
return@coroutineScope
}
// Gather data to push
val (membersPush, infoPush, keysPush) = configFactory.withMutableGroupConfigs(groupId) { configs ->
val membersPush = if (configs.groupMembers.needsPush()) {
configs.groupMembers.push()
} else {
null
}
val infoPush = if (configs.groupInfo.needsPush()) {
configs.groupInfo.push()
} else {
null
}
Triple(membersPush, infoPush, configs.groupKeys.pendingConfig())
}
// Nothing to push?
if (membersPush == null && infoPush == null && keysPush == null) {
return@coroutineScope
}
Log.d(TAG, "Pushing group configs")
val snode = SnodeAPI.getSingleTargetSnode(groupId.hexString).await()
val auth = OwnedSwarmAuth.ofClosedGroup(groupId, adminKey)
// Spawn the config pushing concurrently
val membersConfigHashTask = membersPush?.let {
async {
membersPush to pushConfig(
auth,
snode,
membersPush,
Namespace.CLOSED_GROUP_MEMBERS()
)
}
}
val infoConfigHashTask = infoPush?.let {
async {
infoPush to pushConfig(auth, snode, infoPush, Namespace.CLOSED_GROUP_INFO())
}
}
// Keys push is different: it doesn't have the delete call so we don't call pushConfig
val keysPushResult = keysPush?.let {
SnodeAPI.sendBatchRequest(
snode = snode,
publicKey = auth.accountId.hexString,
request = SnodeAPI.buildAuthenticatedStoreBatchInfo(
Namespace.ENCRYPTION_KEYS(),
SnodeMessage(
auth.accountId.hexString,
Base64.encodeBytes(keysPush),
SnodeMessage.CONFIG_TTL,
SnodeAPI.nowWithOffset,
),
auth
),
responseType = StoreMessageResponse::class.java
).toConfigPushResult()
}
// Wait for all other config push to come back
val memberPushResult = membersConfigHashTask?.await()
val infoPushResult = infoConfigHashTask?.await()
configFactory.confirmGroupConfigsPushed(
groupId,
memberPushResult,
infoPushResult,
keysPushResult
)
Log.i(
TAG,
"Pushed group configs, " +
"info = ${infoPush != null}, " +
"members = ${membersPush != null}, " +
"keys = ${keysPush != null}"
)
}
private suspend fun pushConfig(
auth: SwarmAuth,
snode: Snode,
push: ConfigPush,
namespace: Int
): ConfigPushResult {
val response = SnodeAPI.sendBatchRequest(
snode = snode,
publicKey = auth.accountId.hexString,
request = SnodeAPI.buildAuthenticatedStoreBatchInfo(
namespace,
SnodeMessage(
auth.accountId.hexString,
Base64.encodeBytes(push.config),
SnodeMessage.CONFIG_TTL,
SnodeAPI.nowWithOffset,
),
auth,
),
responseType = StoreMessageResponse::class.java
)
if (push.obsoleteHashes.isNotEmpty()) {
SnodeAPI.sendBatchRequest(
snode = snode,
publicKey = auth.accountId.hexString,
request = SnodeAPI.buildAuthenticatedDeleteBatchInfo(auth, push.obsoleteHashes)
)
}
return response.toConfigPushResult()
}
private suspend fun pushUserConfigChangesIfNeeded() = coroutineScope {
val userAuth = requireNotNull(storageProtocol.userAuth) {
"Current user not available"
}
data class PushInformation(
val namespace: Int,
val configClass: Class<out MutableConfig>,
val push: ConfigPush,
)
// Gather all the user configs that need to be pushed
val pushes = configFactory.withMutableUserConfigs { configs ->
configs.allConfigs()
.filter { it.needsPush() }
.map { config ->
PushInformation(
namespace = config.namespace(),
configClass = config.javaClass,
push = config.push(),
)
UserConfigType.entries
.mapNotNull { type ->
val config = configs.getConfig(type)
if (!config.needsPush()) {
return@mapNotNull null
}
type to config.push()
}
.toList()
}
Log.d("ConfigSyncHandler", "Pushing ${pushes.size} configs")
if (pushes.isEmpty()) {
return@coroutineScope
}
Log.d(TAG, "Pushing ${pushes.size} user configs")
val snode = SnodeAPI.getSingleTargetSnode(userAuth.accountId.hexString).await()
val pushTasks = pushes.map { info ->
val calls = buildList {
this += SnodeAPI.buildAuthenticatedStoreBatchInfo(
info.namespace,
SnodeMessage(
userAuth.accountId.hexString,
Base64.encodeBytes(info.push.config),
SnodeMessage.CONFIG_TTL,
SnodeAPI.nowWithOffset,
),
userAuth
)
if (info.push.obsoleteHashes.isNotEmpty()) {
this += SnodeAPI.buildAuthenticatedDeleteBatchInfo(
messageHashes = info.push.obsoleteHashes,
auth = userAuth,
)
}
}
val pushTasks = pushes.map { (configType, configPush) ->
async {
val responses = SnodeAPI.getBatchResponse(
snode = snode,
publicKey = userAuth.accountId.hexString,
requests = calls,
sequence = true
)
val firstError = responses.results.firstOrNull { !it.isSuccessful }
check(firstError == null) {
"Failed to push config change due to error: ${firstError?.body}"
}
val hash = responses.results.first().body.get("hash").asText()
require(hash.isNotEmpty()) {
"Missing server hash for pushed config"
}
info to hash
(configType to configPush) to pushConfig(userAuth, snode, configPush, configType.namespace)
}
}
val pushResults = pushTasks.awaitAll().associateBy { it.first.configClass }
val pushResults = pushTasks.awaitAll().associate { it.first.first to (it.first.second to it.second) }
Log.d("ConfigSyncHandler", "Pushed ${pushResults.size} configs")
Log.d(TAG, "Pushed ${pushResults.size} user configs")
configFactory.withMutableUserConfigs { configs ->
configs.allConfigs()
.mapNotNull { config -> pushResults[config.javaClass]?.let { Triple(config, it.first, it.second) } }
.forEach { (config, info, hash) ->
config.confirmPushed(info.push.seqNo, hash)
}
}
configFactory.confirmUserConfigsPushed(
contacts = pushResults[UserConfigType.CONTACTS],
userGroups = pushResults[UserConfigType.USER_GROUPS],
convoInfoVolatile = pushResults[UserConfigType.CONVO_INFO_VOLATILE],
userProfile = pushResults[UserConfigType.USER_PROFILE]
)
}
private fun StoreMessageResponse.toConfigPushResult(): ConfigPushResult {
return ConfigPushResult(hash, timestamp)
}
}

View File

@@ -3,9 +3,12 @@ package org.session.libsession.messaging.groups
import android.os.SystemClock
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.async
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.firstOrNull
import kotlinx.coroutines.launch
import network.loki.messenger.libsession_util.ReadableGroupKeysConfig
@@ -19,24 +22,34 @@ import org.session.libsession.snode.SnodeAPI
import org.session.libsession.snode.SnodeMessage
import org.session.libsession.snode.utilities.await
import org.session.libsession.utilities.ConfigFactoryProtocol
import org.session.libsession.utilities.TextSecurePreferences
import org.session.libsignal.protos.SignalServiceProtos
import org.session.libsignal.protos.SignalServiceProtos.DataMessage.GroupUpdateMessage
import org.session.libsignal.utilities.AccountId
import org.session.libsignal.utilities.Base64
import org.session.libsignal.utilities.Log
import org.session.libsignal.utilities.Namespace
import javax.inject.Inject
private const val TAG = "RemoveGroupMemberHandler"
private const val MIN_PROCESS_INTERVAL_MILLS = 1_000L
class RemoveGroupMemberHandler(
class RemoveGroupMemberHandler @Inject constructor(
private val configFactory: ConfigFactoryProtocol,
private val scope: CoroutineScope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
private val textSecurePreferences: TextSecurePreferences,
) {
init {
scope.launch {
private val scope: CoroutineScope = GlobalScope
private var job: Job? = null
fun start() {
require(job == null) { "Already started" }
job = scope.launch {
while (true) {
// Make sure we have a local number before we start processing
textSecurePreferences.watchLocalNumber().first { it != null }
val processStartedAt = SystemClock.uptimeMillis()
try {

View File

@@ -118,6 +118,7 @@ class JobQueue : JobDelegate {
while (isActive) {
when (val job = queue.receive()) {
is InviteContactsJob,
is NotifyPNServerJob,
is AttachmentUploadJob,
is GroupLeavingJob,
@@ -226,6 +227,7 @@ class JobQueue : JobDelegate {
OpenGroupDeleteJob.KEY,
RetrieveProfileAvatarJob.KEY,
GroupLeavingJob.KEY,
InviteContactsJob.KEY,
LibSessionGroupLeavingJob.KEY
)
allJobTypes.forEach { type ->

View File

@@ -9,7 +9,6 @@ import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import network.loki.messenger.libsession_util.util.GroupInfo
import network.loki.messenger.libsession_util.util.Sodium
import org.session.libsession.database.StorageProtocol
import org.session.libsession.messaging.groups.GroupManagerV2
@@ -19,10 +18,13 @@ import org.session.libsession.messaging.jobs.MessageReceiveParameters
import org.session.libsession.messaging.messages.Destination
import org.session.libsession.snode.RawResponse
import org.session.libsession.snode.SnodeAPI
import org.session.libsession.snode.model.RetrieveMessageResponse
import org.session.libsession.snode.utilities.await
import org.session.libsession.utilities.ConfigFactoryProtocol
import org.session.libsession.utilities.ConfigMessage
import org.session.libsession.utilities.getClosedGroup
import org.session.libsignal.database.LokiAPIDatabaseProtocol
import org.session.libsignal.utilities.AccountId
import org.session.libsignal.utilities.Base64
import org.session.libsignal.utilities.Log
import org.session.libsignal.utilities.Namespace
import org.session.libsignal.utilities.Snode
@@ -36,37 +38,12 @@ class ClosedGroupPoller(
private val configFactoryProtocol: ConfigFactoryProtocol,
private val groupManagerV2: GroupManagerV2,
private val storage: StorageProtocol,
private val lokiApiDatabase: LokiAPIDatabaseProtocol,
) {
data class ParsedRawMessage(
val data: ByteArray,
val hash: String,
val timestamp: Long
) {
override fun equals(other: Any?): Boolean {
if (this === other) return true
if (javaClass != other?.javaClass) return false
other as ParsedRawMessage
if (!data.contentEquals(other.data)) return false
if (hash != other.hash) return false
if (timestamp != other.timestamp) return false
return true
}
override fun hashCode(): Int {
var result = data.contentHashCode()
result = 31 * result + hash.hashCode()
result = 31 * result + timestamp.hashCode()
return result
}
}
companion object {
const val POLL_INTERVAL = 3_000L
const val ENABLE_LOGGING = false
private const val POLL_INTERVAL = 3_000L
private const val TAG = "ClosedGroupPoller"
}
private var job: Job? = null
@@ -74,26 +51,36 @@ class ClosedGroupPoller(
fun start() {
if (job?.isActive == true) return // already started, don't restart
if (ENABLE_LOGGING) Log.d("ClosedGroupPoller", "Starting closed group poller for ${closedGroupSessionId.hexString.take(4)}")
Log.d(TAG, "Starting closed group poller for ${closedGroupSessionId.hexString.take(4)}")
job?.cancel()
job = scope.launch(executor) {
var snode: Snode? = null
while (isActive) {
val group = configFactoryProtocol.withUserConfigs { it.userGroups.getClosedGroup(closedGroupSessionId.hexString) } ?: break
val nextPoll = runCatching { poll(group) }
configFactoryProtocol.getClosedGroup(closedGroupSessionId) ?: break
if (snode == null) {
Log.i(TAG, "No Snode, fetching one")
snode = SnodeAPI.getSingleTargetSnode(closedGroupSessionId.hexString).await()
}
val nextPoll = runCatching { poll(snode!!) }
when {
nextPoll.isFailure -> {
Log.e("ClosedGroupPoller", "Error polling closed group", nextPoll.exceptionOrNull())
Log.e(TAG, "Error polling closed group", nextPoll.exceptionOrNull())
// Clearing snode so we get a new one next time
snode = null
delay(POLL_INTERVAL)
}
nextPoll.getOrNull() == null -> {
// assume null poll time means don't continue polling, either the group has been deleted or something else
if (ENABLE_LOGGING) Log.d("ClosedGroupPoller", "Stopping the closed group poller")
Log.d(TAG, "Stopping the closed group poller")
break
}
else -> {
delay(nextPoll.getOrThrow()!!)
delay(POLL_INTERVAL)
}
}
}
@@ -105,10 +92,9 @@ class ClosedGroupPoller(
job = null
}
private suspend fun poll(group: GroupInfo.ClosedGroupInfo): Long? = coroutineScope {
val snode = SnodeAPI.getSingleTargetSnode(closedGroupSessionId.hexString).await()
val groupAuth = configFactoryProtocol.getGroupAuth(closedGroupSessionId) ?: return@coroutineScope null
private suspend fun poll(snode: Snode): Unit = coroutineScope {
val groupAuth =
configFactoryProtocol.getGroupAuth(closedGroupSessionId) ?: return@coroutineScope
val configHashesToExtends = configFactoryProtocol.withGroupConfigs(closedGroupSessionId) {
buildSet {
addAll(it.groupKeys.currentHashes())
@@ -117,91 +103,36 @@ class ClosedGroupPoller(
}
}
val adminKey = requireNotNull(configFactoryProtocol.withUserConfigs { it.userGroups.getClosedGroup(closedGroupSessionId.hexString) }) {
val adminKey = requireNotNull(configFactoryProtocol.withUserConfigs {
it.userGroups.getClosedGroup(closedGroupSessionId.hexString)
}) {
"Group doesn't exist"
}.adminKey
val pollingTasks = mutableListOf<Pair<String, Deferred<*>>>()
pollingTasks += "Poll revoked messages" to async {
pollingTasks += "retrieving revoked messages" to async {
handleRevoked(
SnodeAPI.sendBatchRequest(
snode,
closedGroupSessionId.hexString,
SnodeAPI.buildAuthenticatedRetrieveBatchRequest(
snode = snode,
lastHash = lokiApiDatabase.getLastMessageHashValue(
snode,
closedGroupSessionId.hexString,
Namespace.REVOKED_GROUP_MESSAGES()
).orEmpty(),
auth = groupAuth,
namespace = Namespace.REVOKED_GROUP_MESSAGES(),
maxSize = null,
),
Map::class.java
RetrieveMessageResponse::class.java
)
)
}
pollingTasks += "Poll group messages" to async {
handleMessages(
body = SnodeAPI.sendBatchRequest(
snode,
closedGroupSessionId.hexString,
SnodeAPI.buildAuthenticatedRetrieveBatchRequest(
snode = snode,
auth = groupAuth,
namespace = Namespace.CLOSED_GROUP_MESSAGES(),
maxSize = null,
),
Map::class.java),
snode = snode,
)
}
pollingTasks += "Poll group keys config" to async {
handleKeyPoll(
response = SnodeAPI.sendBatchRequest(
snode,
closedGroupSessionId.hexString,
SnodeAPI.buildAuthenticatedRetrieveBatchRequest(
snode = snode,
auth = groupAuth,
namespace = Namespace.ENCRYPTION_KEYS(),
maxSize = null,
),
Map::class.java),
)
}
pollingTasks += "Poll group info config" to async {
handleInfo(
response = SnodeAPI.sendBatchRequest(
snode,
closedGroupSessionId.hexString,
SnodeAPI.buildAuthenticatedRetrieveBatchRequest(
snode = snode,
auth = groupAuth,
namespace = Namespace.CLOSED_GROUP_INFO(),
maxSize = null,
),
Map::class.java),
)
}
pollingTasks += "Poll group members config" to async {
handleMembers(
SnodeAPI.sendBatchRequest(
snode,
closedGroupSessionId.hexString,
SnodeAPI.buildAuthenticatedRetrieveBatchRequest(
snode = snode,
auth = groupAuth,
namespace = Namespace.CLOSED_GROUP_MEMBERS(),
maxSize = null,
),
Map::class.java),
)
}
if (configHashesToExtends.isNotEmpty() && adminKey != null) {
pollingTasks += "Extend group config TTL" to async {
pollingTasks += "extending group config TTL" to async {
SnodeAPI.sendBatchRequest(
snode,
closedGroupSessionId.hexString,
@@ -215,52 +146,105 @@ class ClosedGroupPoller(
}
}
val groupMessageRetrieval = async {
SnodeAPI.sendBatchRequest(
snode = snode,
publicKey = closedGroupSessionId.hexString,
request = SnodeAPI.buildAuthenticatedRetrieveBatchRequest(
lastHash = lokiApiDatabase.getLastMessageHashValue(
snode,
closedGroupSessionId.hexString,
Namespace.CLOSED_GROUP_MESSAGES()
).orEmpty(),
auth = groupAuth,
namespace = Namespace.CLOSED_GROUP_MESSAGES(),
maxSize = null,
),
responseType = Map::class.java
)
}
val groupConfigRetrieval = listOf(
Namespace.ENCRYPTION_KEYS(),
Namespace.CLOSED_GROUP_INFO(),
Namespace.CLOSED_GROUP_MEMBERS()
).map { ns ->
async {
SnodeAPI.sendBatchRequest(
snode = snode,
publicKey = closedGroupSessionId.hexString,
request = SnodeAPI.buildAuthenticatedRetrieveBatchRequest(
lastHash = lokiApiDatabase.getLastMessageHashValue(
snode,
closedGroupSessionId.hexString,
ns
).orEmpty(),
auth = groupAuth,
namespace = ns,
maxSize = null,
),
responseType = RetrieveMessageResponse::class.java
)
}
}
// The retrieval of the config and regular messages can be done concurrently,
// however, in order for the messages to be able to be decrypted, the config messages
// must be processed first.
pollingTasks += "polling and handling group config keys and messages" to async {
val (keysMessage, infoMessage, membersMessage) = groupConfigRetrieval.map { it.await() }
saveLastMessageHash(snode, keysMessage, Namespace.ENCRYPTION_KEYS())
saveLastMessageHash(snode, infoMessage, Namespace.CLOSED_GROUP_INFO())
saveLastMessageHash(snode, membersMessage, Namespace.CLOSED_GROUP_MEMBERS())
handleGroupConfigMessages(keysMessage, infoMessage, membersMessage)
val regularMessages = groupMessageRetrieval.await()
handleMessages(regularMessages, snode)
}
// Wait for all tasks to complete, gather any exceptions happened during polling
val errors = pollingTasks.mapNotNull { (name, task) ->
runCatching { task.await() }
.exceptionOrNull()
?.takeIf { it !is CancellationException }
?.let { RuntimeException("Error executing: $name", it) }
?.let { RuntimeException("Error $name", it) }
}
// If there were any errors, throw the first one and add the rest as "suppressed" exceptions
if (errors.isNotEmpty()) {
throw PollerException("Error polling closed group", errors)
}
POLL_INTERVAL // this might change in future
}
private fun parseMessages(body: RawResponse): List<ParsedRawMessage> {
val messages = body["messages"] as? List<*> ?: return emptyList()
return messages.mapNotNull { messageMap ->
val rawMessageAsJSON = messageMap as? Map<*, *> ?: return@mapNotNull null
val base64EncodedData = rawMessageAsJSON["data"] as? String ?: return@mapNotNull null
val hash = rawMessageAsJSON["hash"] as? String ?: return@mapNotNull null
val timestamp = rawMessageAsJSON["timestamp"] as? Long ?: return@mapNotNull null
val data = base64EncodedData.let { Base64.decode(it) }
ParsedRawMessage(data, hash, timestamp)
throw errors.first().apply {
for (index in 1 until errors.size) {
addSuppressed(errors[index])
}
}
}
}
private suspend fun handleRevoked(body: RawResponse) {
// This shouldn't ever return null at this point
val messages = body["messages"] as? List<*>
?: return Log.w("GroupPoller", "body didn't contain a list of messages")
messages.forEach { messageMap ->
val rawMessageAsJSON = messageMap as? Map<*,*>
?: return@forEach Log.w("GroupPoller", "rawMessage wasn't a map as expected")
val data = rawMessageAsJSON["data"] as? String ?: return@forEach
val hash = rawMessageAsJSON["hash"] as? String ?: return@forEach
val timestamp = rawMessageAsJSON["timestamp"] as? Long ?: return@forEach
Log.d("GroupPoller", "Handling message with hash $hash")
private fun RetrieveMessageResponse.Message.toConfigMessage(): ConfigMessage {
return ConfigMessage(hash, data, timestamp)
}
private fun saveLastMessageHash(snode: Snode, body: RetrieveMessageResponse, namespace: Int) {
if (body.messages.isNotEmpty()) {
lokiApiDatabase.setLastMessageHashValue(
snode = snode,
publicKey = closedGroupSessionId.hexString,
newValue = body.messages.last().hash,
namespace = namespace
)
}
}
private suspend fun handleRevoked(body: RetrieveMessageResponse) {
body.messages.forEach { msg ->
val decoded = configFactoryProtocol.maybeDecryptForUser(
Base64.decode(data),
msg.data,
Sodium.KICKED_DOMAIN,
closedGroupSessionId,
)
if (decoded != null) {
Log.d("GroupPoller", "decoded kick message was for us")
Log.d(TAG, "decoded kick message was for us")
val message = decoded.decodeToString()
if (Sodium.KICKED_REGEX.matches(message)) {
val (sessionId, generation) = message.split("-")
@@ -279,44 +263,31 @@ class ClosedGroupPoller(
}
}
}
}
}
private fun handleKeyPoll(response: RawResponse) {
// get all the data to hash objects and process them
val allMessages = parseMessages(response)
if (ENABLE_LOGGING) Log.d("ClosedGroupPoller", "Total key messages this poll: ${allMessages.size}")
var total = 0
allMessages.forEach { (message, hash, timestamp) ->
configFactoryProtocol.withMutableGroupConfigs(closedGroupSessionId) { configs ->
if (configs.loadKeys(message, hash, timestamp)) {
total++
}
}
if (ENABLE_LOGGING) Log.d("ClosedGroupPoller", "Merged $hash for keys on ${closedGroupSessionId.hexString}")
private fun handleGroupConfigMessages(
keysResponse: RetrieveMessageResponse,
infoResponse: RetrieveMessageResponse,
membersResponse: RetrieveMessageResponse
) {
if (keysResponse.messages.isEmpty() && infoResponse.messages.isEmpty() && membersResponse.messages.isEmpty()) {
return
}
if (ENABLE_LOGGING) Log.d("ClosedGroupPoller", "Total key messages consumed: $total")
}
private fun handleInfo(response: RawResponse) {
val messages = parseMessages(response)
messages.forEach { (message, hash, _) ->
configFactoryProtocol.withMutableGroupConfigs(closedGroupSessionId) { configs ->
configs.groupInfo.merge(arrayOf(hash to message))
}
if (ENABLE_LOGGING) Log.d("ClosedGroupPoller", "Merged $hash for info on ${closedGroupSessionId.hexString}")
}
}
Log.d(
TAG, "Handling group config messages(" +
"info = ${infoResponse.messages.size}, " +
"keys = ${keysResponse.messages.size}, " +
"members = ${membersResponse.messages.size})"
)
private fun handleMembers(response: RawResponse) {
parseMessages(response).forEach { (message, hash, _) ->
configFactoryProtocol.withMutableGroupConfigs(closedGroupSessionId) { configs ->
configs.groupMembers.merge(arrayOf(hash to message))
}
if (ENABLE_LOGGING) Log.d("ClosedGroupPoller", "Merged $hash for members on ${closedGroupSessionId.hexString}")
}
configFactoryProtocol.mergeGroupConfigMessages(
groupId = closedGroupSessionId,
keys = keysResponse.messages.map { it.toConfigMessage() },
info = infoResponse.messages.map { it.toConfigMessage() },
members = membersResponse.messages.map { it.toConfigMessage() },
)
}
private fun handleMessages(body: RawResponse, snode: Snode) {
@@ -342,8 +313,8 @@ class ClosedGroupPoller(
JobQueue.shared.add(job)
}
if (ENABLE_LOGGING) Log.d("ClosedGroupPoller", "namespace for messages rx count: ${messages.size}")
if (messages.isNotEmpty()) {
Log.d(TAG, "namespace for messages rx count: ${messages.size}")
}
}
}

View File

@@ -3,25 +3,14 @@ package org.session.libsession.messaging.sending_receiving.pollers
import android.util.SparseArray
import androidx.core.util.valueIterator
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.async
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.runBlocking
import network.loki.messenger.libsession_util.ConfigBase
import network.loki.messenger.libsession_util.Contacts
import network.loki.messenger.libsession_util.ConversationVolatileConfig
import network.loki.messenger.libsession_util.MutableConfig
import network.loki.messenger.libsession_util.MutableContacts
import network.loki.messenger.libsession_util.MutableConversationVolatileConfig
import network.loki.messenger.libsession_util.MutableUserGroupsConfig
import network.loki.messenger.libsession_util.MutableUserProfile
import network.loki.messenger.libsession_util.UserGroupsConfig
import network.loki.messenger.libsession_util.UserProfile
import nl.komponents.kovenant.Deferred
import nl.komponents.kovenant.Promise
import nl.komponents.kovenant.deferred
import nl.komponents.kovenant.functional.bind
import nl.komponents.kovenant.resolve
import nl.komponents.kovenant.task
import org.session.libsession.database.StorageProtocol
import org.session.libsession.database.userAuth
import org.session.libsession.messaging.MessagingModuleConfiguration
import org.session.libsession.messaging.jobs.BatchMessageReceiveJob
@@ -31,8 +20,9 @@ import org.session.libsession.snode.RawResponse
import org.session.libsession.snode.SnodeAPI
import org.session.libsession.snode.SnodeModule
import org.session.libsession.utilities.ConfigFactoryProtocol
import org.session.libsession.utilities.Contact.Name
import org.session.libsession.utilities.MutableGroupConfigs
import org.session.libsession.utilities.ConfigMessage
import org.session.libsession.utilities.UserConfigType
import org.session.libsignal.database.LokiAPIDatabaseProtocol
import org.session.libsignal.utilities.Base64
import org.session.libsignal.utilities.Log
import org.session.libsignal.utilities.Namespace
@@ -46,8 +36,14 @@ private const val TAG = "Poller"
private class PromiseCanceledException : Exception("Promise canceled.")
class Poller(private val configFactory: ConfigFactoryProtocol) {
var userPublicKey = MessagingModuleConfiguration.shared.storage.getUserPublicKey() ?: ""
class Poller(
private val configFactory: ConfigFactoryProtocol,
private val storage: StorageProtocol,
private val lokiApiDatabase: LokiAPIDatabaseProtocol
) {
private val userPublicKey: String
get() = storage.getUserPublicKey().orEmpty()
private var hasStarted: Boolean = false
private val usedSnodes: MutableSet<Snode> = mutableSetOf()
var isCaughtUp = false
@@ -74,12 +70,14 @@ class Poller(private val configFactory: ConfigFactoryProtocol) {
}
fun retrieveUserProfile() {
Log.d(TAG, "Retrieving user profile.")
Log.d(TAG, "Retrieving user profile. for key = $userPublicKey")
SnodeAPI.getSwarm(userPublicKey).bind {
usedSnodes.clear()
deferred<Unit, Exception>().also {
pollNextSnode(userProfileOnly = true, it)
}.promise
}.fail {
Log.e(TAG, "Failed to retrieve user profile.", it)
}
}
// endregion
@@ -144,8 +142,9 @@ class Poller(private val configFactory: ConfigFactoryProtocol) {
}
}
private fun processConfig(snode: Snode, rawMessages: RawResponse, namespace: Int, forConfig: Class<out MutableConfig>) {
private fun processConfig(snode: Snode, rawMessages: RawResponse, forConfig: UserConfigType) {
val messages = rawMessages["messages"] as? List<*>
val namespace = forConfig.namespace
val processed = if (!messages.isNullOrEmpty()) {
SnodeAPI.updateLastMessageHashValueIfPossible(snode, userPublicKey, messages, namespace)
SnodeAPI.removeDuplicates(userPublicKey, messages, namespace, true).mapNotNull { rawMessageAsJSON ->
@@ -153,24 +152,21 @@ class Poller(private val configFactory: ConfigFactoryProtocol) {
val b64EncodedBody = rawMessageAsJSON["data"] as? String ?: return@mapNotNull null
val timestamp = rawMessageAsJSON["t"] as? Long ?: SnodeAPI.nowWithOffset
val body = Base64.decode(b64EncodedBody)
Triple(body, hashValue, timestamp)
ConfigMessage(data = body, hash = hashValue, timestamp = timestamp)
}
} else emptyList()
if (processed.isEmpty()) return
processed.forEach { (body, hash, _) ->
try {
configFactory.withMutableUserConfigs { configs ->
configs
.allConfigs()
.filter { it.javaClass.isInstance(forConfig) }
.first()
.merge(arrayOf(hash to body))
}
} catch (e: Exception) {
Log.e(TAG, e)
}
Log.i(TAG, "Processing ${processed.size} messages for $forConfig")
try {
configFactory.mergeUserConfigs(
userConfigType = forConfig,
messages = processed,
)
} catch (e: Exception) {
Log.e(TAG, e)
}
}
@@ -182,57 +178,57 @@ class Poller(private val configFactory: ConfigFactoryProtocol) {
}
private fun pollUserProfile(snode: Snode, deferred: Deferred<Unit, Exception>): Promise<Unit, Exception> = task {
runBlocking(Dispatchers.IO) {
val requests = mutableListOf<SnodeAPI.SnodeBatchRequestInfo>()
val hashesToExtend = mutableSetOf<String>()
val userAuth = requireNotNull(MessagingModuleConfiguration.shared.storage.userAuth)
val requests = mutableListOf<SnodeAPI.SnodeBatchRequestInfo>()
val hashesToExtend = mutableSetOf<String>()
val userAuth = requireNotNull(MessagingModuleConfiguration.shared.storage.userAuth)
configFactory.withUserConfigs {
val config = it.userProfile
hashesToExtend += config.currentHashes()
SnodeAPI.buildAuthenticatedRetrieveBatchRequest(
snode = snode,
configFactory.withUserConfigs {
hashesToExtend += it.userProfile.currentHashes()
}
requests += SnodeAPI.buildAuthenticatedRetrieveBatchRequest(
lastHash = lokiApiDatabase.getLastMessageHashValue(
snode = snode,
publicKey = userAuth.accountId.hexString,
namespace = Namespace.USER_PROFILE()
),
auth = userAuth,
namespace = Namespace.USER_PROFILE(),
maxSize = -8
)
if (hashesToExtend.isNotEmpty()) {
SnodeAPI.buildAuthenticatedAlterTtlBatchRequest(
messageHashes = hashesToExtend.toList(),
auth = userAuth,
namespace = config.namespace(),
maxSize = -8
)
}.let { request ->
requests += request
newExpiry = SnodeAPI.nowWithOffset + 14.days.inWholeMilliseconds,
extend = true
).let { extensionRequest ->
requests += extensionRequest
}
}
if (hashesToExtend.isNotEmpty()) {
SnodeAPI.buildAuthenticatedAlterTtlBatchRequest(
messageHashes = hashesToExtend.toList(),
auth = userAuth,
newExpiry = SnodeAPI.nowWithOffset + 14.days.inWholeMilliseconds,
extend = true
).let { extensionRequest ->
requests += extensionRequest
}
}
if (requests.isNotEmpty()) {
SnodeAPI.getRawBatchResponse(snode, userPublicKey, requests).bind { rawResponses ->
isCaughtUp = true
if (!deferred.promise.isDone()) {
val responseList = (rawResponses["results"] as List<RawResponse>)
responseList.getOrNull(0)?.let { rawResponse ->
if (rawResponse["code"] as? Int != 200) {
Log.e(TAG, "Batch sub-request had non-200 response code, returned code ${(rawResponse["code"] as? Int) ?: "[unknown]"}")
if (requests.isNotEmpty()) {
SnodeAPI.getRawBatchResponse(snode, userPublicKey, requests).bind { rawResponses ->
isCaughtUp = true
if (!deferred.promise.isDone()) {
val responseList = (rawResponses["results"] as List<RawResponse>)
responseList.getOrNull(0)?.let { rawResponse ->
if (rawResponse["code"] as? Int != 200) {
Log.e(TAG, "Batch sub-request had non-200 response code, returned code ${(rawResponse["code"] as? Int) ?: "[unknown]"}")
} else {
val body = rawResponse["body"] as? RawResponse
if (body == null) {
Log.e(TAG, "Batch sub-request didn't contain a body")
} else {
val body = rawResponse["body"] as? RawResponse
if (body == null) {
Log.e(TAG, "Batch sub-request didn't contain a body")
} else {
processConfig(snode, body, Namespace.USER_PROFILE(), MutableUserProfile::class.java)
}
processConfig(snode, body, UserConfigType.USER_PROFILE)
}
}
}
Promise.ofSuccess(Unit)
}.fail {
Log.e(TAG, "Failed to get raw batch response", it)
}
Promise.ofSuccess(Unit)
}.fail {
Log.e(TAG, "Failed to get raw batch response", it)
}
}
}
@@ -245,23 +241,37 @@ class Poller(private val configFactory: ConfigFactoryProtocol) {
val userAuth = requireNotNull(MessagingModuleConfiguration.shared.storage.userAuth)
val requestSparseArray = SparseArray<SnodeAPI.SnodeBatchRequestInfo>()
// get messages
SnodeAPI.buildAuthenticatedRetrieveBatchRequest(snode, auth = userAuth, maxSize = -2)
SnodeAPI.buildAuthenticatedRetrieveBatchRequest(
lastHash = lokiApiDatabase.getLastMessageHashValue(
snode = snode,
publicKey = userAuth.accountId.hexString,
namespace = Namespace.DEFAULT()
),
auth = userAuth,
maxSize = -2)
.also { personalMessages ->
// namespaces here should always be set
requestSparseArray[personalMessages.namespace!!] = personalMessages
}
// get the latest convo info volatile
val hashesToExtend = mutableSetOf<String>()
configFactory.withUserConfigs {
it.allConfigs().map { config ->
hashesToExtend += config.currentHashes()
config.namespace() to SnodeAPI.buildAuthenticatedRetrieveBatchRequest(
snode = snode,
auth = userAuth,
namespace = config.namespace(),
maxSize = -8
)
}
configFactory.withUserConfigs { configs ->
UserConfigType
.entries
.map { type ->
val config = configs.getConfig(type)
hashesToExtend += config.currentHashes()
type.namespace to SnodeAPI.buildAuthenticatedRetrieveBatchRequest(
lastHash = lokiApiDatabase.getLastMessageHashValue(
snode = snode,
publicKey = userAuth.accountId.hexString,
namespace = type.namespace
),
auth = userAuth,
namespace = type.namespace,
maxSize = -8
)
}
}.forEach { (namespace, request) ->
// namespaces here should always be set
requestSparseArray[namespace] = request
@@ -290,29 +300,24 @@ class Poller(private val configFactory: ConfigFactoryProtocol) {
val responseList = (rawResponses["results"] as List<RawResponse>)
// in case we had null configs, the array won't be fully populated
// index of the sparse array key iterator should be the request index, with the key being the namespace
sequenceOf(
Namespace.USER_PROFILE() to MutableUserProfile::class.java,
Namespace.CONTACTS() to MutableContacts::class.java,
Namespace.GROUPS() to MutableUserGroupsConfig::class.java,
Namespace.CONVO_INFO_VOLATILE() to MutableConversationVolatileConfig::class.java
).map { (namespace, configClass) ->
Triple(namespace, configClass, requestSparseArray.indexOfKey(namespace))
}.filter { (_, _, i) -> i >= 0 }
.forEach { (namespace, configClass, requestIndex) ->
responseList.getOrNull(requestIndex)?.let { rawResponse ->
if (rawResponse["code"] as? Int != 200) {
Log.e(TAG, "Batch sub-request had non-200 response code, returned code ${(rawResponse["code"] as? Int) ?: "[unknown]"}")
return@forEach
UserConfigType.entries
.map { type -> type to requestSparseArray.indexOfKey(type.namespace) }
.filter { (_, i) -> i >= 0 }
.forEach { (configType, requestIndex) ->
responseList.getOrNull(requestIndex)?.let { rawResponse ->
if (rawResponse["code"] as? Int != 200) {
Log.e(TAG, "Batch sub-request had non-200 response code, returned code ${(rawResponse["code"] as? Int) ?: "[unknown]"}")
return@forEach
}
val body = rawResponse["body"] as? RawResponse
if (body == null) {
Log.e(TAG, "Batch sub-request didn't contain a body")
return@forEach
}
processConfig(snode, body, configType)
}
val body = rawResponse["body"] as? RawResponse
if (body == null) {
Log.e(TAG, "Batch sub-request didn't contain a body")
return@forEach
}
processConfig(snode, body, namespace, configClass)
}
}
// the first response will be the personal messages (we want these to be processed after config messages)
val personalResponseIndex = requestSparseArray.indexOfKey(Namespace.DEFAULT())

View File

@@ -1,15 +0,0 @@
package org.session.libsession.messaging.sending_receiving.pollers
/**
* Exception thrown by a Poller-family when multiple error could have occurred.
*/
class PollerException(message: String, errors: List<Throwable>) : RuntimeException(
message,
errors.firstOrNull()
) {
init {
errors.asSequence()
.drop(1)
.forEach(this::addSuppressed)
}
}

View File

@@ -192,7 +192,7 @@ object UpdateMessageBuilder {
.put(OTHER_NAME_KEY, context.youOrSender(updateData.sessionIds.first { it != userPublicKey }))
.format()
number == 2 -> Phrase.from(context,
R.string.groupMemberNewMultiple)
R.string.groupMemberNewTwo)
.put(NAME_KEY, context.youOrSender(updateData.sessionIds.first()))
.put(OTHER_NAME_KEY, context.youOrSender(updateData.sessionIds.last()))
.format()

View File

@@ -510,8 +510,8 @@ object SnodeAPI {
}
fun buildAuthenticatedRetrieveBatchRequest(
snode: Snode,
auth: SwarmAuth,
lastHash: String?,
namespace: Int = 0,
maxSize: Int? = null
): SnodeBatchRequestInfo {
@@ -520,7 +520,7 @@ object SnodeAPI {
auth = auth,
verificationData = { ns, t -> "${Snode.Method.Retrieve.rawValue}$ns$t" },
) {
put("last_hash", database.getLastMessageHashValue(snode, auth.accountId.hexString, namespace).orEmpty())
put("last_hash", lastHash.orEmpty())
if (maxSize != null) {
put("max_size", maxSize)
}
@@ -639,13 +639,15 @@ object SnodeAPI {
return@batch
}
// For each response, parse the result, match it with the request then send
// back through the request's callback.
for ((req, resp) in batch.zip(responses.results)) {
val result = if (resp.code != 200) {
Result.failure(RuntimeException("Error with code = ${resp.code}, msg = ${resp.body}"))
} else {
runCatching {
JsonUtil.fromJson(resp.body, req.responseType)
val result = runCatching {
check(resp.code == 200) {
"Error with code = ${resp.code}, msg = ${resp.body}"
}
JsonUtil.fromJson(resp.body, req.responseType)
}
req.callback.send(result)

View File

@@ -0,0 +1,20 @@
package org.session.libsession.snode.model
import com.fasterxml.jackson.annotation.JsonCreator
import com.fasterxml.jackson.annotation.JsonProperty
data class StoreMessageResponse @JsonCreator constructor(
@JsonProperty("hash") val hash: String,
@JsonProperty("t") val timestamp: Long,
)
class RetrieveMessageResponse @JsonCreator constructor(
@JsonProperty("messages") val messages: List<Message>,
) {
class Message @JsonCreator constructor(
@JsonProperty("hash") val hash: String,
@JsonProperty("t") val timestamp: Long,
// Jackson is able to deserialize byte arrays from base64 strings
@JsonProperty("data") val data: ByteArray,
)
}

View File

@@ -1,9 +0,0 @@
package org.session.libsession.snode.model
import com.fasterxml.jackson.annotation.JsonCreator
import com.fasterxml.jackson.annotation.JsonProperty
data class StoreMessageResponse @JsonCreator constructor(
@JsonProperty("hash") val hash: String,
@JsonProperty("t") val timestamp: Long,
)

View File

@@ -17,14 +17,18 @@ import network.loki.messenger.libsession_util.ReadableGroupKeysConfig
import network.loki.messenger.libsession_util.ReadableGroupMembersConfig
import network.loki.messenger.libsession_util.ReadableUserGroupsConfig
import network.loki.messenger.libsession_util.ReadableUserProfile
import network.loki.messenger.libsession_util.util.ConfigPush
import network.loki.messenger.libsession_util.util.GroupInfo
import org.session.libsession.snode.SwarmAuth
import org.session.libsignal.utilities.AccountId
import org.session.libsignal.utilities.Namespace
interface ConfigFactoryProtocol {
val configUpdateNotifications: Flow<ConfigUpdateNotification>
fun <T> withUserConfigs(cb: (UserConfigs) -> T): T
fun <T> withMutableUserConfigs(cb: (MutableUserConfigs) -> T): T
fun mergeUserConfigs(userConfigType: UserConfigType, messages: List<ConfigMessage>)
fun <T> withGroupConfigs(groupId: AccountId, cb: (GroupConfigs) -> T): T
fun <T> withMutableGroupConfigs(groupId: AccountId, cb: (MutableGroupConfigs) -> T): T
@@ -39,8 +43,52 @@ interface ConfigFactoryProtocol {
domain: String,
closedGroupSessionId: AccountId): ByteArray?
fun mergeGroupConfigMessages(
groupId: AccountId,
keys: List<ConfigMessage>,
info: List<ConfigMessage>,
members: List<ConfigMessage>
)
fun confirmUserConfigsPushed(
contacts: Pair<ConfigPush, ConfigPushResult>? = null,
userProfile: Pair<ConfigPush, ConfigPushResult>? = null,
convoInfoVolatile: Pair<ConfigPush, ConfigPushResult>? = null,
userGroups: Pair<ConfigPush, ConfigPushResult>? = null
)
fun confirmGroupConfigsPushed(
groupId: AccountId,
members: Pair<ConfigPush, ConfigPushResult>?,
info: Pair<ConfigPush, ConfigPushResult>?,
keysPush: ConfigPushResult?
)
}
class ConfigMessage(
val hash: String,
val data: ByteArray,
val timestamp: Long
)
data class ConfigPushResult(
val hash: String,
val timestamp: Long
)
enum class UserConfigType(val namespace: Int) {
CONTACTS(Namespace.CONTACTS()),
USER_PROFILE(Namespace.USER_PROFILE()),
CONVO_INFO_VOLATILE(Namespace.CONVO_INFO_VOLATILE()),
USER_GROUPS(Namespace.GROUPS()),
}
/**
* Shortcut to get the group info for a closed group. Equivalent to: `withUserConfigs { it.userGroups.getClosedGroup(groupId) }`
*/
fun ConfigFactoryProtocol.getClosedGroup(groupId: AccountId): GroupInfo.ClosedGroupInfo? {
return withUserConfigs { it.userGroups.getClosedGroup(groupId.hexString) }
}
interface UserConfigs {
val contacts: ReadableContacts
@@ -48,7 +96,14 @@ interface UserConfigs {
val userProfile: ReadableUserProfile
val convoInfoVolatile: ReadableConversationVolatileConfig
fun allConfigs(): Sequence<ReadableConfig> = sequenceOf(contacts, userGroups, userProfile, convoInfoVolatile)
fun getConfig(type: UserConfigType): ReadableConfig {
return when (type) {
UserConfigType.CONTACTS -> contacts
UserConfigType.USER_PROFILE -> userProfile
UserConfigType.CONVO_INFO_VOLATILE -> convoInfoVolatile
UserConfigType.USER_GROUPS -> userGroups
}
}
}
interface MutableUserConfigs : UserConfigs {
@@ -57,7 +112,14 @@ interface MutableUserConfigs : UserConfigs {
override val userProfile: MutableUserProfile
override val convoInfoVolatile: MutableConversationVolatileConfig
override fun allConfigs(): Sequence<MutableConfig> = sequenceOf(contacts, userGroups, userProfile, convoInfoVolatile)
override fun getConfig(type: UserConfigType): MutableConfig {
return when (type) {
UserConfigType.CONTACTS -> contacts
UserConfigType.USER_PROFILE -> userProfile
UserConfigType.CONVO_INFO_VOLATILE -> convoInfoVolatile
UserConfigType.USER_GROUPS -> userGroups
}
}
}
interface GroupConfigs {
@@ -71,8 +133,7 @@ interface MutableGroupConfigs : GroupConfigs {
override val groupMembers: MutableGroupMembersConfig
override val groupKeys: MutableGroupKeysConfig
fun loadKeys(message: ByteArray, hash: String, timestamp: Long): Boolean
fun rekeys()
fun rekey()
}
sealed interface ConfigUpdateNotification {
@@ -80,30 +141,3 @@ sealed interface ConfigUpdateNotification {
data class GroupConfigsUpdated(val groupId: AccountId) : ConfigUpdateNotification
data class GroupConfigsDeleted(val groupId: AccountId) : ConfigUpdateNotification
}
//interface ConfigFactoryUpdateListener {
// fun notifyUpdates(forConfigObject: Config, messageTimestamp: Long)
//}
///**
// * Access group configs if they exist, otherwise return null.
// *
// * Note: The config objects will be closed after the callback is executed. Any attempt
// * to store the config objects will result in a native crash.
// */
//inline fun <T: Any> ConfigFactoryProtocol.withGroupConfigsOrNull(
// groupId: AccountId,
// cb: (GroupInfoConfig, GroupMembersConfig, GroupKeysConfig) -> T
//): T? {
// getGroupInfoConfig(groupId)?.use { groupInfo ->
// getGroupMemberConfig(groupId)?.use { groupMembers ->
// getGroupKeysConfig(groupId, groupInfo, groupMembers)?.use { groupKeys ->
// return cb(groupInfo, groupMembers, groupKeys)
// }
// }
// }
//
// return null
//}

View File

@@ -41,8 +41,6 @@ import javax.inject.Singleton
interface TextSecurePreferences {
fun getLastConfigurationSyncTime(): Long
fun setLastConfigurationSyncTime(value: Long)
fun getConfigurationMessageSynced(): Boolean
fun setConfigurationMessageSynced(value: Boolean)
@@ -108,6 +106,7 @@ interface TextSecurePreferences {
fun setUpdateApkDigest(value: String?)
fun getUpdateApkDigest(): String?
fun getLocalNumber(): String?
fun watchLocalNumber(): StateFlow<String?>
fun getHasLegacyConfig(): Boolean
fun setHasLegacyConfig(newValue: Boolean)
fun setLocalNumber(localNumber: String)
@@ -203,6 +202,10 @@ interface TextSecurePreferences {
@JvmStatic
var pushSuffix = ""
// This is a stop-gap solution for static access to shared preference.
internal lateinit var preferenceInstance: TextSecurePreferences
const val DISABLE_PASSPHRASE_PREF = "pref_disable_passphrase"
const val LANGUAGE_PREF = "pref_language"
const val THREAD_TRIM_NOW = "pref_trim_now"
@@ -219,7 +222,7 @@ interface TextSecurePreferences {
const val SCREEN_SECURITY_PREF = "pref_screen_security"
const val ENTER_SENDS_PREF = "pref_enter_sends"
const val THREAD_TRIM_ENABLED = "pref_trim_threads"
const val LOCAL_NUMBER_PREF = "pref_local_number"
internal const val LOCAL_NUMBER_PREF = "pref_local_number"
const val REGISTERED_GCM_PREF = "pref_gcm_registered"
const val UPDATE_APK_REFRESH_TIME_PREF = "pref_update_apk_refresh_time"
const val UPDATE_APK_DOWNLOAD_ID = "pref_update_apk_download_id"
@@ -265,7 +268,6 @@ interface TextSecurePreferences {
const val GIF_METADATA_WARNING = "has_seen_gif_metadata_warning"
const val GIF_GRID_LAYOUT = "pref_gif_grid_layout"
val IS_PUSH_ENABLED get() = "pref_is_using_fcm$pushSuffix"
const val LAST_CONFIGURATION_SYNC_TIME = "pref_last_configuration_sync_time"
const val CONFIGURATION_SYNCED = "pref_configuration_synced"
const val LAST_PROFILE_UPDATE_TIME = "pref_last_profile_update_time"
const val LAST_OPEN_DATE = "pref_last_open_date"
@@ -308,16 +310,16 @@ interface TextSecurePreferences {
// for the lifetime of the Session installation.
const val HAVE_WARNED_USER_ABOUT_SAVING_ATTACHMENTS = "libsession.HAVE_WARNED_USER_ABOUT_SAVING_ATTACHMENTS"
@JvmStatic
fun getLastConfigurationSyncTime(context: Context): Long {
return getLongPreference(context, LAST_CONFIGURATION_SYNC_TIME, 0)
}
@JvmStatic
fun setLastConfigurationSyncTime(context: Context, value: Long) {
setLongPreference(context, LAST_CONFIGURATION_SYNC_TIME, value)
}
// @JvmStatic
// fun getLastConfigurationSyncTime(context: Context): Long {
// return getLongPreference(context, LAST_CONFIGURATION_SYNC_TIME, 0)
// }
//
// @JvmStatic
// fun setLastConfigurationSyncTime(context: Context, value: Long) {
// setLongPreference(context, LAST_CONFIGURATION_SYNC_TIME, value)
// }
//
@JvmStatic
fun getConfigurationMessageSynced(context: Context): Boolean {
return getBooleanPreference(context, CONFIGURATION_SYNCED, false)
@@ -629,9 +631,13 @@ interface TextSecurePreferences {
return getStringPreference(context, UPDATE_APK_DIGEST, null)
}
@Deprecated(
"Use the dependency-injected TextSecurePreference instance instead",
ReplaceWith("TextSecurePreferences.getLocalNumber()")
)
@JvmStatic
fun getLocalNumber(context: Context): String? {
return getStringPreference(context, LOCAL_NUMBER_PREF, null)
return preferenceInstance.getLocalNumber()
}
@JvmStatic
@@ -645,13 +651,6 @@ interface TextSecurePreferences {
_events.tryEmit(HAS_RECEIVED_LEGACY_CONFIG)
}
fun setLocalNumber(context: Context, localNumber: String) {
setStringPreference(context, LOCAL_NUMBER_PREF, localNumber.toLowerCase())
}
fun removeLocalNumber(context: Context) {
removePreference(context, LOCAL_NUMBER_PREF)
}
@JvmStatic
fun isEnterSendsEnabled(context: Context): Boolean {
@@ -994,14 +993,12 @@ interface TextSecurePreferences {
class AppTextSecurePreferences @Inject constructor(
@ApplicationContext private val context: Context
): TextSecurePreferences {
override fun getLastConfigurationSyncTime(): Long {
return getLongPreference(TextSecurePreferences.LAST_CONFIGURATION_SYNC_TIME, 0)
init {
// Should remove once all static access to the companion objects is removed
TextSecurePreferences.preferenceInstance = this
}
override fun setLastConfigurationSyncTime(value: Long) {
setLongPreference(TextSecurePreferences.LAST_CONFIGURATION_SYNC_TIME, value)
}
private val localNumberState = MutableStateFlow(getStringPreference(TextSecurePreferences.LOCAL_NUMBER_PREF, null))
override fun getConfigurationMessageSynced(): Boolean {
return getBooleanPreference(TextSecurePreferences.CONFIGURATION_SYNCED, false)
@@ -1262,7 +1259,11 @@ class AppTextSecurePreferences @Inject constructor(
}
override fun getLocalNumber(): String? {
return getStringPreference(TextSecurePreferences.LOCAL_NUMBER_PREF, null)
return localNumberState.value
}
override fun watchLocalNumber(): StateFlow<String?> {
return localNumberState
}
override fun getHasLegacyConfig(): Boolean {
@@ -1275,10 +1276,13 @@ class AppTextSecurePreferences @Inject constructor(
}
override fun setLocalNumber(localNumber: String) {
setStringPreference(TextSecurePreferences.LOCAL_NUMBER_PREF, localNumber.toLowerCase())
val normalised = localNumber.lowercase()
setStringPreference(TextSecurePreferences.LOCAL_NUMBER_PREF, normalised)
localNumberState.value = normalised
}
override fun removeLocalNumber() {
localNumberState.value = null
removePreference(TextSecurePreferences.LOCAL_NUMBER_PREF)
}