feat: introduce the handling of poll messages for closed group poller

This commit is contained in:
0x330a
2023-09-13 12:11:42 +10:00
parent df29ed8f16
commit 040696c08b
6 changed files with 141 additions and 28 deletions

View File

@@ -906,8 +906,8 @@ open class Storage(context: Context, helper: SQLCipherOpenHelper, private val co
val group = userGroups.createGroup() val group = userGroups.createGroup()
val adminKey = group.adminKey val adminKey = group.adminKey
userGroups.set(group) userGroups.set(group)
val groupInfo = configFactory.getOrConstructGroupInfoConfig(group.groupSessionId) ?: return Optional.absent() val groupInfo = configFactory.getGroupInfoConfig(group.groupSessionId) ?: return Optional.absent()
val groupMembers = configFactory.getOrConstructGroupMemberConfig(group.groupSessionId) ?: return Optional.absent() val groupMembers = configFactory.getGroupMemberConfig(group.groupSessionId) ?: return Optional.absent()
with (groupInfo) { with (groupInfo) {
setName(groupName) setName(groupName)
@@ -921,7 +921,7 @@ open class Storage(context: Context, helper: SQLCipherOpenHelper, private val co
val groupKeys = GroupKeysConfig.newInstance( val groupKeys = GroupKeysConfig.newInstance(
userKp.secretKey.asBytes, userKp.secretKey.asBytes,
Hex.fromStringCondensed(group.groupSessionId.publicKey), Hex.fromStringCondensed(group.groupSessionId.publicKey),
group.adminKey, adminKey,
info = groupInfo, info = groupInfo,
members = groupMembers members = groupMembers
) )
@@ -930,7 +930,6 @@ open class Storage(context: Context, helper: SQLCipherOpenHelper, private val co
val configTtl = 1 * 24 * 60 * 60 * 1000L // TODO: just testing here, 1 day so we don't fill large space on network val configTtl = 1 * 24 * 60 * 60 * 1000L // TODO: just testing here, 1 day so we don't fill large space on network
// Test the sending // Test the sending
val keyPush = groupKeys.pendingConfig() ?: return Optional.absent() val keyPush = groupKeys.pendingConfig() ?: return Optional.absent()
val pendingKey = groupKeys.pendingKey() ?: return Optional.absent()
val keysSnodeMessage = SnodeMessage( val keysSnodeMessage = SnodeMessage(
newGroupRecipient, newGroupRecipient,
@@ -1180,7 +1179,7 @@ open class Storage(context: Context, helper: SQLCipherOpenHelper, private val co
} }
override fun getMembers(groupPublicKey: String): List<LibSessionGroupMember> = override fun getMembers(groupPublicKey: String): List<LibSessionGroupMember> =
configFactory.getOrConstructGroupMemberConfig(SessionId.from(groupPublicKey))?.all()?.toList() ?: emptyList() configFactory.getGroupMemberConfig(SessionId.from(groupPublicKey))?.all()?.toList() ?: emptyList()
override fun setServerCapabilities(server: String, capabilities: List<String>) { override fun setServerCapabilities(server: String, capabilities: List<String>) {
return DatabaseComponent.get(context).lokiAPIDatabase().setServerCapabilities(server, capabilities) return DatabaseComponent.get(context).lokiAPIDatabase().setServerCapabilities(server, capabilities)

View File

@@ -176,7 +176,7 @@ class ConfigFactory(
it.adminKey to it.authData it.adminKey to it.authData
} }
override fun getOrConstructGroupInfoConfig(groupSessionId: SessionId): GroupInfoConfig? = getGroupAuthInfo(groupSessionId)?.let { (sk, _) -> override fun getGroupInfoConfig(groupSessionId: SessionId): GroupInfoConfig? = getGroupAuthInfo(groupSessionId)?.let { (sk, _) ->
// get any potential initial dumps // get any potential initial dumps
val dump = configDatabase.retrieveConfigAndHashes( val dump = configDatabase.retrieveConfigAndHashes(
SharedConfigMessage.Kind.CLOSED_GROUP_INFO.name, SharedConfigMessage.Kind.CLOSED_GROUP_INFO.name,
@@ -191,10 +191,10 @@ class ConfigFactory(
val (userSk, _) = maybeGetUserInfo() ?: return@let null val (userSk, _) = maybeGetUserInfo() ?: return@let null
// Get the group info or return early // Get the group info or return early
val info = getOrConstructGroupInfoConfig(groupSessionId) ?: return@let null val info = getGroupInfoConfig(groupSessionId) ?: return@let null
// Get the group members or return early // Get the group members or return early
val members = getOrConstructGroupMemberConfig(groupSessionId) ?: return@let null val members = getGroupMemberConfig(groupSessionId) ?: return@let null
// Get the dump or empty // Get the dump or empty
val dump = configDatabase.retrieveConfigAndHashes( val dump = configDatabase.retrieveConfigAndHashes(
@@ -213,7 +213,7 @@ class ConfigFactory(
) )
} }
override fun getOrConstructGroupMemberConfig(groupSessionId: SessionId): GroupMembersConfig? = getGroupAuthInfo(groupSessionId)?.let { (sk, auth) -> override fun getGroupMemberConfig(groupSessionId: SessionId): GroupMembersConfig? = getGroupAuthInfo(groupSessionId)?.let { (sk, auth) ->
// Get initial dump if we have one // Get initial dump if we have one
val dump = configDatabase.retrieveConfigAndHashes( val dump = configDatabase.retrieveConfigAndHashes(
SharedConfigMessage.Kind.CLOSED_GROUP_MEMBERS.name, SharedConfigMessage.Kind.CLOSED_GROUP_MEMBERS.name,

View File

@@ -32,6 +32,10 @@ sealed class GroupInfo {
return result return result
} }
fun signingKey(): ByteArray {
return if (adminKey.isNotEmpty()) adminKey else authData
}
} }
data class LegacyGroupInfo( data class LegacyGroupInfo(

View File

@@ -4,9 +4,15 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job import kotlinx.coroutines.Job
import kotlinx.coroutines.delay import kotlinx.coroutines.delay
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import network.loki.messenger.libsession_util.GroupInfoConfig
import network.loki.messenger.libsession_util.GroupKeysConfig
import network.loki.messenger.libsession_util.GroupMembersConfig
import network.loki.messenger.libsession_util.util.GroupInfo
import org.session.libsession.snode.RawResponse
import org.session.libsession.snode.SnodeAPI import org.session.libsession.snode.SnodeAPI
import org.session.libsession.utilities.ConfigFactoryProtocol import org.session.libsession.utilities.ConfigFactoryProtocol
import org.session.libsignal.utilities.Log import org.session.libsignal.utilities.Log
import org.session.libsignal.utilities.Namespace
import org.session.libsignal.utilities.SessionId import org.session.libsignal.utilities.SessionId
class ClosedGroupPoller(private val executor: CoroutineScope, class ClosedGroupPoller(private val executor: CoroutineScope,
@@ -23,8 +29,18 @@ class ClosedGroupPoller(private val executor: CoroutineScope,
fun start() { fun start() {
job?.cancel() job?.cancel()
job = executor.launch { job = executor.launch {
val nextPoll = poll() val closedGroups = configFactoryProtocol.userGroups?: return@launch
while (true) {
val group = closedGroups.getClosedGroup(closedGroupSessionId.hexString()) ?: break
val nextPoll = poll(group)
if (nextPoll != null) {
delay(nextPoll) delay(nextPoll)
} else {
Log.d("ClosedGroupPoller", "Stopping the closed group poller")
return@launch
}
}
// assume null poll time means don't continue polling, either the group has been deleted or something else
} }
} }
@@ -32,17 +48,92 @@ class ClosedGroupPoller(private val executor: CoroutineScope,
job?.cancel() job?.cancel()
} }
fun poll(): Long { fun poll(group: GroupInfo.ClosedGroupInfo): Long? {
try { try {
val snode = SnodeAPI.getSingleTargetSnode(closedGroupSessionId.hexString()).get() val snode = SnodeAPI.getSingleTargetSnode(closedGroupSessionId.hexString()).get()
val info = configFactoryProtocol.getOrConstructGroupInfoConfig(closedGroupSessionId) val info = configFactoryProtocol.getGroupInfoConfig(closedGroupSessionId) ?: return null
val members = configFactoryProtocol.getOrConstructGroupMemberConfig(closedGroupSessionId) val members = configFactoryProtocol.getGroupMemberConfig(closedGroupSessionId) ?: return null
val keys = configFactoryProtocol.getGroupKeysConfig(closedGroupSessionId) val keys = configFactoryProtocol.getGroupKeysConfig(closedGroupSessionId) ?: return null
val keysIndex = 0
val infoIndex = 1
val membersIndex = 2
val messageIndex = 3
val messagePoll = SnodeAPI.buildAuthenticatedRetrieveBatchRequest(
snode,
closedGroupSessionId.hexString(),
Namespace.DEFAULT,
maxSize = null,
group.signingKey()
) ?: return null
val infoPoll = SnodeAPI.buildAuthenticatedRetrieveBatchRequest(
snode,
closedGroupSessionId.hexString(),
info.configNamespace(),
maxSize = null,
group.signingKey()
) ?: return null
val membersPoll = SnodeAPI.buildAuthenticatedRetrieveBatchRequest(
snode,
closedGroupSessionId.hexString(),
members.configNamespace(),
maxSize = null,
group.signingKey()
) ?: return null
val keysPoll = SnodeAPI.buildAuthenticatedRetrieveBatchRequest(
snode,
closedGroupSessionId.hexString(),
GroupKeysConfig.storageNamespace(),
maxSize = null,
group.signingKey()
) ?: return null
val pollResult = SnodeAPI.getRawBatchResponse(
snode,
closedGroupSessionId.hexString(),
listOf(keysPoll, infoPoll, membersPoll, messagePoll)
).get()
// TODO: add the extend duration TTLs for known hashes here
(pollResult["body"] as List<RawResponse>).forEachIndexed { index, response ->
when (index) {
keysIndex -> handleKeyPoll(response, keys, info, members)
infoIndex -> handleInfo(response, info)
membersIndex -> handleMembers(response, members)
messageIndex -> handleMessages(response)
}
}
} catch (e: Exception) { } catch (e: Exception) {
Log.e("GroupPoller", "Polling failed for group", e) Log.e("GroupPoller", "Polling failed for group", e)
return POLL_INTERVAL
} }
return POLL_INTERVAL // this might change in future return POLL_INTERVAL // this might change in future
} }
private fun handleKeyPoll(response: RawResponse,
keysConfig: GroupKeysConfig,
infoConfig: GroupInfoConfig,
membersConfig: GroupMembersConfig) {
}
private fun handleInfo(response: RawResponse,
infoConfig: GroupInfoConfig) {
}
private fun handleMembers(response: RawResponse,
membersConfig: GroupMembersConfig) {
}
private fun handleMessages(response: RawResponse) {
// TODO
}
} }

View File

@@ -460,19 +460,21 @@ object SnodeAPI {
) )
} }
fun buildAuthenticatedRetrieveBatchRequest(snode: Snode, publicKey: String, namespace: Int = 0, maxSize: Int? = null): SnodeBatchRequestInfo? { fun buildAuthenticatedRetrieveBatchRequest(snode: Snode,
publicKey: String,
namespace: Int,
maxSize: Int? = null,
signingKey: ByteArray,
ed25519PublicKey: Key? = null): SnodeBatchRequestInfo? {
val lastHashValue = database.getLastMessageHashValue(snode, publicKey, namespace) ?: "" val lastHashValue = database.getLastMessageHashValue(snode, publicKey, namespace) ?: ""
val params = mutableMapOf<String, Any>( val params = mutableMapOf<String, Any>(
"pubkey" to publicKey, "pubkey" to publicKey,
"last_hash" to lastHashValue, "last_hash" to lastHashValue
) )
val userEd25519KeyPair = try { if (ed25519PublicKey != null) {
MessagingModuleConfiguration.shared.getUserED25519KeyPair() ?: return null params["pubkey_ed25519"] = ed25519PublicKey.asHexString
} catch (e: Exception) {
return null
} }
val ed25519PublicKey = userEd25519KeyPair.publicKey.asHexString val timestamp = nowWithOffset
val timestamp = System.currentTimeMillis() + clockOffset
val signature = ByteArray(Sign.BYTES) val signature = ByteArray(Sign.BYTES)
val verificationData = if (namespace == 0) "retrieve$timestamp".toByteArray() val verificationData = if (namespace == 0) "retrieve$timestamp".toByteArray()
else "retrieve$namespace$timestamp".toByteArray() else "retrieve$namespace$timestamp".toByteArray()
@@ -481,14 +483,13 @@ object SnodeAPI {
signature, signature,
verificationData, verificationData,
verificationData.size.toLong(), verificationData.size.toLong(),
userEd25519KeyPair.secretKey.asBytes signingKey
) )
} catch (e: Exception) { } catch (e: Exception) {
Log.e("Loki", "Signing data failed with user secret key", e) Log.e("BatchRetrieve", "Signing data failed with provided signing key", e)
return null return null
} }
params["timestamp"] = timestamp params["timestamp"] = timestamp
params["pubkey_ed25519"] = ed25519PublicKey
params["signature"] = Base64.encodeBytes(signature) params["signature"] = Base64.encodeBytes(signature)
if (namespace != 0) { if (namespace != 0) {
params["namespace"] = namespace params["namespace"] = namespace
@@ -503,6 +504,24 @@ object SnodeAPI {
) )
} }
fun buildAuthenticatedRetrieveBatchRequest(snode: Snode, publicKey: String, namespace: Int = 0, maxSize: Int? = null): SnodeBatchRequestInfo? {
val userEd25519KeyPair = try {
MessagingModuleConfiguration.shared.getUserED25519KeyPair() ?: return null
} catch (e: Exception) {
return null
}
val secretKey = userEd25519KeyPair.secretKey.asBytes
val ed25519PublicKey = userEd25519KeyPair.publicKey
return buildAuthenticatedRetrieveBatchRequest(
snode,
publicKey,
namespace,
maxSize,
secretKey,
ed25519PublicKey
)
}
fun buildAuthenticatedAlterTtlBatchRequest( fun buildAuthenticatedAlterTtlBatchRequest(
messageHashes: List<String>, messageHashes: List<String>,
newExpiry: Long, newExpiry: Long,

View File

@@ -16,8 +16,8 @@ interface ConfigFactoryProtocol {
val convoVolatile: ConversationVolatileConfig? val convoVolatile: ConversationVolatileConfig?
val userGroups: UserGroupsConfig? val userGroups: UserGroupsConfig?
fun getOrConstructGroupInfoConfig(groupSessionId: SessionId): GroupInfoConfig? fun getGroupInfoConfig(groupSessionId: SessionId): GroupInfoConfig?
fun getOrConstructGroupMemberConfig(groupSessionId: SessionId): GroupMembersConfig? fun getGroupMemberConfig(groupSessionId: SessionId): GroupMembersConfig?
fun getGroupKeysConfig(groupSessionId: SessionId): GroupKeysConfig? fun getGroupKeysConfig(groupSessionId: SessionId): GroupKeysConfig?
fun getUserConfigs(): List<ConfigBase> fun getUserConfigs(): List<ConfigBase>