feat: basic polling happening with hashes merging

This commit is contained in:
0x330a
2023-09-13 18:29:19 +10:00
parent 040696c08b
commit 82e3516a60
6 changed files with 156 additions and 15 deletions

View File

@@ -1,6 +1,7 @@
package org.session.libsession.messaging.sending_receiving.pollers
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
@@ -11,6 +12,7 @@ import network.loki.messenger.libsession_util.util.GroupInfo
import org.session.libsession.snode.RawResponse
import org.session.libsession.snode.SnodeAPI
import org.session.libsession.utilities.ConfigFactoryProtocol
import org.session.libsignal.utilities.Base64
import org.session.libsignal.utilities.Log
import org.session.libsignal.utilities.Namespace
import org.session.libsignal.utilities.SessionId
@@ -19,6 +21,32 @@ class ClosedGroupPoller(private val executor: CoroutineScope,
private val closedGroupSessionId: SessionId,
private val configFactoryProtocol: ConfigFactoryProtocol) {
data class ParsedRawMessage(
val data: ByteArray,
val hash: String,
val timestamp: Long
) {
override fun equals(other: Any?): Boolean {
if (this === other) return true
if (javaClass != other?.javaClass) return false
other as ParsedRawMessage
if (!data.contentEquals(other.data)) return false
if (hash != other.hash) return false
if (timestamp != other.timestamp) return false
return true
}
override fun hashCode(): Int {
var result = data.contentHashCode()
result = 31 * result + hash.hashCode()
result = 31 * result + timestamp.hashCode()
return result
}
}
companion object {
const val POLL_INTERVAL = 3_000L
}
@@ -27,10 +55,14 @@ class ClosedGroupPoller(private val executor: CoroutineScope,
private var job: Job? = null
fun start() {
if (isRunning) return // already started, don't restart
Log.d("ClosedGroupPoller", "Starting closed group poller for ${closedGroupSessionId.hexString().take(4)}")
job?.cancel()
job = executor.launch {
job = executor.launch(Dispatchers.IO) {
val closedGroups = configFactoryProtocol.userGroups?: return@launch
while (true) {
isRunning = true
while (isRunning) {
val group = closedGroups.getClosedGroup(closedGroupSessionId.hexString()) ?: break
val nextPoll = poll(group)
if (nextPoll != null) {
@@ -45,7 +77,9 @@ class ClosedGroupPoller(private val executor: CoroutineScope,
}
fun stop() {
isRunning = false
job?.cancel()
job = null
}
fun poll(group: GroupInfo.ClosedGroupInfo): Long? {
@@ -97,15 +131,21 @@ class ClosedGroupPoller(private val executor: CoroutineScope,
// TODO: add the extend duration TTLs for known hashes here
(pollResult["body"] as List<RawResponse>).forEachIndexed { index, response ->
// if poll result body is null here we don't have any things ig
(pollResult["results"] as List<RawResponse>).forEachIndexed { index, response ->
when (index) {
keysIndex -> handleKeyPoll(response, keys, info, members)
infoIndex -> handleInfo(response, info)
membersIndex -> handleMembers(response, members)
messageIndex -> handleMessages(response)
messageIndex -> handleMessages(response, keys)
}
}
configFactoryProtocol.saveGroupConfigs(keys, info, members)
keys.free()
info.free()
members.free()
} catch (e: Exception) {
Log.e("GroupPoller", "Polling failed for group", e)
return POLL_INTERVAL
@@ -113,27 +153,55 @@ class ClosedGroupPoller(private val executor: CoroutineScope,
return POLL_INTERVAL // this might change in future
}
private fun parseMessages(response: RawResponse): List<ParsedRawMessage> {
val body = response["body"] as? RawResponse
if (body == null) {
Log.e("GroupPoller", "Batch parse messages contained no body!")
return emptyList()
}
val messages = body["messages"] as? List<*> ?: return emptyList()
return messages.mapNotNull { messageMap ->
val rawMessageAsJSON = messageMap as? Map<*, *> ?: return@mapNotNull null
val base64EncodedData = rawMessageAsJSON["data"] as? String ?: return@mapNotNull null
val hash = rawMessageAsJSON["hash"] as? String ?: return@mapNotNull null
val timestamp = rawMessageAsJSON["timestamp"] as? Long ?: return@mapNotNull null
val data = base64EncodedData.let { Base64.decode(it) }
ParsedRawMessage(data, hash, timestamp)
}
}
private fun handleKeyPoll(response: RawResponse,
keysConfig: GroupKeysConfig,
infoConfig: GroupInfoConfig,
membersConfig: GroupMembersConfig) {
// get all the data to hash objects and process them
parseMessages(response).forEach { (message, hash, timestamp) ->
keysConfig.loadKey(message, hash, timestamp, infoConfig, membersConfig)
Log.d("ClosedGroupPoller", "Merged $hash for keys")
}
}
private fun handleInfo(response: RawResponse,
infoConfig: GroupInfoConfig) {
parseMessages(response).forEach { (message, hash, _) ->
infoConfig.merge(hash to message)
Log.d("ClosedGroupPoller", "Merged $hash for info")
}
}
private fun handleMembers(response: RawResponse,
membersConfig: GroupMembersConfig) {
parseMessages(response).forEach { (message, hash, _) ->
membersConfig.merge(hash to message)
Log.d("ClosedGroupPoller", "Merged $hash for members")
}
}
private fun handleMessages(response: RawResponse) {
// TODO
private fun handleMessages(response: RawResponse, keysConfig: GroupKeysConfig) {
val messages = parseMessages(response)
if (messages.isNotEmpty()) {
// TODO: process decrypting bundles
}
}
}