fix: fixing up LokiMessageDatabase.kt table structure, deletion closer to finished

This commit is contained in:
jubb
2021-04-29 17:13:42 +10:00
parent 51554f1cdf
commit ef19c0d10e
15 changed files with 138 additions and 52 deletions

View File

@@ -11,7 +11,8 @@ import java.io.InputStream
interface MessageDataProvider {
fun getMessageID(serverID: Long): Long?
fun deleteMessage(messageID: Long)
fun getMessageID(serverId: Long, threadId: Long): Pair<Long, Boolean>?
fun deleteMessage(messageID: Long, isSms: Boolean)
fun getDatabaseAttachment(attachmentId: Long): DatabaseAttachment?

View File

@@ -64,7 +64,7 @@ interface StorageProtocol {
// Open Groups
fun getThreadID(openGroupID: String): String?
fun addOpenGroup(server: String, channel: Long)
fun setOpenGroupServerMessageID(messageID: Long, serverID: Long)
fun setOpenGroupServerMessageID(messageID: Long, serverID: Long, threadID: Long, isSms: Boolean)
fun getQuoteServerID(quoteID: Long, publicKey: String): Long?
// Open Group Public Keys
@@ -137,7 +137,7 @@ interface StorageProtocol {
fun getOrCreateThreadIdFor(address: Address): Long
fun getOrCreateThreadIdFor(publicKey: String, groupPublicKey: String?, openGroupID: String?): Long
fun getThreadIdFor(address: Address): Long?
fun getThreadIdForMms(messageId: Long): Long
fun getThreadIdForMms(mmsId: Long): Long
// Session Request
fun getSessionRequestSentTimestamp(publicKey: String): Long?

View File

@@ -2,6 +2,7 @@ package org.session.libsession.messaging.open_groups
import com.fasterxml.jackson.databind.PropertyNamingStrategy
import com.fasterxml.jackson.databind.annotation.JsonNaming
import com.fasterxml.jackson.databind.type.TypeFactory
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.MutableSharedFlow
import nl.komponents.kovenant.Kovenant
@@ -28,6 +29,8 @@ import org.session.libsignal.utilities.logging.Log
import org.whispersystems.curve25519.Curve25519
import java.util.*
typealias DeletionList = List<OpenGroupAPIV2.MessageDeletion>
object OpenGroupAPIV2 {
private val moderators: HashMap<String, Set<String>> = hashMapOf() // Server URL to (channel ID to set of moderator IDs)
@@ -60,7 +63,7 @@ object OpenGroupAPIV2 {
val imageID: String?
)
@JsonNaming(value = PropertyNamingStrategy.SnakeCaseStrategy::class)
@JsonNaming(PropertyNamingStrategy.SnakeCaseStrategy::class)
data class CompactPollRequest(val roomId: String,
val authToken: String,
val fromDeletionServerId: Long?,
@@ -72,6 +75,11 @@ object OpenGroupAPIV2 {
val moderators: List<String>
)
@JsonNaming(PropertyNamingStrategy.SnakeCaseStrategy::class)
data class MessageDeletion @JvmOverloads constructor(val id: Long = 0,
val deletedMessageId: Long = 0
)
data class Request(
val verb: HTTP.Verb,
val room: String?,
@@ -93,7 +101,7 @@ object OpenGroupAPIV2 {
return RequestBody.create(MediaType.get("application/json"), parametersAsJSON)
}
private fun send(request: Request): Promise<Map<*, *>, Exception> {
private fun send(request: Request, isJsonRequired: Boolean = true): Promise<Map<*, *>, Exception> {
val parsed = HttpUrl.parse(request.server) ?: return Promise.ofFail(Error.INVALID_URL)
val urlBuilder = HttpUrl.Builder()
.scheme(parsed.scheme())
@@ -128,7 +136,7 @@ object OpenGroupAPIV2 {
if (request.useOnionRouting) {
val publicKey = MessagingModuleConfiguration.shared.storage.getOpenGroupPublicKey(request.server)
?: return Promise.ofFail(Error.NO_PUBLIC_KEY)
return OnionRequestAPI.sendOnionRequest(requestBuilder.build(), request.server, publicKey)
return OnionRequestAPI.sendOnionRequest(requestBuilder.build(), request.server, publicKey, isJSONRequired = isJsonRequired)
.fail { e ->
if (e is OnionRequestAPI.HTTPRequestFailedAtDestinationException && e.statusCode == 401) {
val storage = MessagingModuleConfiguration.shared.storage
@@ -289,7 +297,7 @@ object OpenGroupAPIV2 {
}
}
fun getDeletedMessages(room: String, server: String): Promise<List<Long>, Exception> {
fun getDeletedMessages(room: String, server: String): Promise<List<MessageDeletion>, Exception> {
val storage = MessagingModuleConfiguration.shared.storage
val queryParameters = mutableMapOf<String, String>()
storage.getLastDeletionServerId(room, server)?.let { last ->
@@ -297,12 +305,13 @@ object OpenGroupAPIV2 {
}
val request = Request(verb = GET, room = room, server = server, endpoint = "deleted_messages", queryParameters = queryParameters)
return send(request).map(sharedContext) { json ->
@Suppress("UNCHECKED_CAST") val serverIDs = (json["ids"] as? List<Int>)?.map { it.toLong() }
?: throw Error.PARSING_FAILED
val type = TypeFactory.defaultInstance().constructCollectionType(List::class.java, MessageDeletion::class.java)
val idsAsString = JsonUtil.toJson(json["ids"])
val serverIDs = JsonUtil.fromJson<List<MessageDeletion>>(idsAsString, type) ?: throw Error.PARSING_FAILED
val lastMessageServerId = storage.getLastDeletionServerId(room, server) ?: 0
val serverID = serverIDs.maxOrNull() ?: 0
if (serverID > lastMessageServerId) {
storage.setLastDeletionServerId(room, server, serverID)
val serverID = serverIDs.maxByOrNull {it.id } ?: serverIDs.first()
if (serverID.id > lastMessageServerId) {
storage.setLastDeletionServerId(room, server, serverID.id)
}
serverIDs
}
@@ -343,14 +352,14 @@ object OpenGroupAPIV2 {
// endregion
// region General
fun getCompactPoll(rooms: List<String>, server: String): Promise<Map<String,CompactPollResult>, Exception> {
val requestAuths = rooms.associateWith { room -> getAuthToken(room,server) }
fun getCompactPoll(rooms: List<String>, server: String): Promise<Map<String, CompactPollResult>, Exception> {
val requestAuths = rooms.associateWith { room -> getAuthToken(room, server) }
val storage = MessagingModuleConfiguration.shared.storage
val requests = rooms.mapNotNull { room ->
val authToken = try {
requestAuths[room]?.get()
} catch (e: Exception) {
Log.e("Loki", "Failed to get auth token for $room",e)
Log.e("Loki", "Failed to get auth token for $room", e)
null
} ?: return@mapNotNull null
@@ -363,7 +372,7 @@ object OpenGroupAPIV2 {
val request = Request(verb = POST, room = null, server = server, endpoint = "compact_poll", isAuthRequired = false, parameters = mapOf("requests" to requests))
// build a request for all rooms
return send(request = request).map(sharedContext) { json ->
val results = json["results"] as? Map<*,*> ?: throw Error.PARSING_FAILED
val results = json["results"] as? Map<*, *> ?: throw Error.PARSING_FAILED
TODO()
}
}

View File

@@ -12,16 +12,15 @@ import org.session.libsession.messaging.messages.control.ClosedGroupControlMessa
import org.session.libsession.messaging.messages.control.ConfigurationMessage
import org.session.libsession.messaging.messages.control.ExpirationTimerUpdate
import org.session.libsession.messaging.messages.visible.*
import org.session.libsession.messaging.open_groups.OpenGroupAPI
import org.session.libsession.messaging.open_groups.OpenGroupAPIV2
import org.session.libsession.messaging.open_groups.OpenGroupMessage
import org.session.libsession.messaging.open_groups.OpenGroupMessageV2
import org.session.libsession.messaging.open_groups.*
import org.session.libsession.messaging.threads.Address
import org.session.libsession.messaging.threads.recipients.Recipient
import org.session.libsession.messaging.utilities.MessageWrapper
import org.session.libsession.snode.RawResponsePromise
import org.session.libsession.snode.SnodeAPI
import org.session.libsession.snode.SnodeModule
import org.session.libsession.snode.SnodeMessage
import org.session.libsession.utilities.GroupUtil
import org.session.libsession.utilities.SSKEnvironment
import org.session.libsignal.service.internal.push.PushTransportDetails
import org.session.libsignal.service.internal.push.SignalServiceProtos
@@ -290,8 +289,12 @@ object MessageSender {
// Ignore future self-sends
storage.addReceivedMessageTimestamp(message.sentTimestamp!!)
// Track the open group server message ID
if (message.openGroupServerMessageID != null) {
storage.setOpenGroupServerMessageID(messageId, message.openGroupServerMessageID!!)
if (message.openGroupServerMessageID != null && destination is Destination.OpenGroupV2) {
val encoded = GroupUtil.getEncodedOpenGroupID("${destination.server}.${destination.room}".toByteArray())
val threadID = storage.getThreadIdFor(Address.fromSerialized(encoded))
if (threadID != null && threadID >= 0) {
storage.setOpenGroupServerMessageID(messageId, message.openGroupServerMessageID!!, threadID, !(message as VisibleMessage).isMediaMessage())
}
}
// Mark the message as sent
storage.markAsSent(message.sentTimestamp!!, message.sender?:userPublicKey)

View File

@@ -151,9 +151,19 @@ fun MessageReceiver.handleVisibleMessage(message: VisibleMessage, proto: SignalS
val storage = MessagingModuleConfiguration.shared.storage
val context = MessagingModuleConfiguration.shared.context
val userPublicKey = storage.getUserPublicKey()
// Get or create thread
val threadID = storage.getOrCreateThreadIdFor(message.syncTarget
?: message.sender!!, message.groupPublicKey, openGroupID)
val openGroup = threadID.let {
storage.getOpenGroup(it.toString())
}
// Update profile if needed
val newProfile = message.profile
if (newProfile != null && openGroupID.isNullOrEmpty() && userPublicKey != message.sender) {
if (newProfile != null && userPublicKey != message.sender && openGroup == null) {
val profileManager = SSKEnvironment.shared.profileManager
val recipient = Recipient.from(context, Address.fromSerialized(message.sender!!), false)
val displayName = newProfile.displayName!!
@@ -172,9 +182,6 @@ fun MessageReceiver.handleVisibleMessage(message: VisibleMessage, proto: SignalS
}
}
}
// Get or create thread
val threadID = storage.getOrCreateThreadIdFor(message.syncTarget
?: message.sender!!, message.groupPublicKey, openGroupID)
// Parse quote if needed
var quoteModel: QuoteModel? = null
if (message.quote != null && proto.dataMessage.hasQuote()) {
@@ -224,6 +231,10 @@ fun MessageReceiver.handleVisibleMessage(message: VisibleMessage, proto: SignalS
JobQueue.shared.add(downloadJob)
}
}
val openGroupServerID = message.openGroupServerMessageID
if (openGroupServerID != null) {
storage.setOpenGroupServerMessageID(messageID, openGroupServerID, threadID, !(message.isMediaMessage() || attachments.isNotEmpty()))
}
// Cancel any typing indicators if needed
cancelTypingIndicatorsIfNeeded(message.sender!!)
//Notify the user if needed

View File

@@ -9,6 +9,8 @@ import org.session.libsession.messaging.jobs.MessageReceiveJob
import org.session.libsession.messaging.open_groups.OpenGroup
import org.session.libsession.messaging.open_groups.OpenGroupAPI
import org.session.libsession.messaging.open_groups.OpenGroupMessage
import org.session.libsession.messaging.threads.Address
import org.session.libsession.utilities.GroupUtil
import org.session.libsignal.service.internal.push.SignalServiceProtos.*
import org.session.libsignal.utilities.logging.Log
import org.session.libsignal.utilities.successBackground
@@ -210,10 +212,13 @@ class OpenGroupPoller(private val openGroup: OpenGroup, private val executorServ
}
private fun pollForDeletedMessages() {
val messagingModule = MessagingModuleConfiguration.shared
val address = GroupUtil.getEncodedOpenGroupID(openGroup.id.toByteArray())
val threadId = messagingModule.storage.getThreadIdFor(Address.fromSerialized(address)) ?: return
OpenGroupAPI.getDeletedMessageServerIDs(openGroup.channel, openGroup.server).success { deletedMessageServerIDs ->
val deletedMessageIDs = deletedMessageServerIDs.mapNotNull { MessagingModuleConfiguration.shared.messageDataProvider.getMessageID(it) }
deletedMessageIDs.forEach {
MessagingModuleConfiguration.shared.messageDataProvider.deleteMessage(it)
val deletedMessageIDs = deletedMessageServerIDs.mapNotNull { messagingModule.messageDataProvider.getMessageID(it, threadId) }
deletedMessageIDs.forEach { (messageId, isSms) ->
MessagingModuleConfiguration.shared.messageDataProvider.deleteMessage(messageId, isSms)
}
}.fail {
Log.d("Loki", "Failed to get deleted messages for group chat with ID: ${openGroup.channel} on server: ${openGroup.server}.")

View File

@@ -7,7 +7,10 @@ import org.session.libsession.messaging.jobs.JobQueue
import org.session.libsession.messaging.jobs.MessageReceiveJob
import org.session.libsession.messaging.open_groups.OpenGroupAPIV2
import org.session.libsession.messaging.open_groups.OpenGroupV2
import org.session.libsession.messaging.threads.Address
import org.session.libsession.utilities.GroupUtil
import org.session.libsignal.service.internal.push.SignalServiceProtos
import org.session.libsignal.service.loki.database.LokiMessageDatabaseProtocol
import org.session.libsignal.utilities.logging.Log
import org.session.libsignal.utilities.successBackground
import java.util.concurrent.ScheduledExecutorService
@@ -101,10 +104,17 @@ class OpenGroupV2Poller(private val openGroup: OpenGroupV2, private val executor
}
private fun pollForDeletedMessages() {
val messagingModule = MessagingModuleConfiguration.shared
val address = GroupUtil.getEncodedOpenGroupID(openGroup.id.toByteArray())
val threadId = messagingModule.storage.getThreadIdFor(Address.fromSerialized(address)) ?: return
OpenGroupAPIV2.getDeletedMessages(openGroup.room, openGroup.server).success { deletedMessageServerIDs ->
val deletedMessageIDs = deletedMessageServerIDs.mapNotNull { MessagingModuleConfiguration.shared.messageDataProvider.getMessageID(it) }
deletedMessageIDs.forEach {
MessagingModuleConfiguration.shared.messageDataProvider.deleteMessage(it)
val deletedMessageIDs = deletedMessageServerIDs.mapNotNull { serverId ->
messagingModule.messageDataProvider.getMessageID(serverId.deletedMessageId, threadId)
}
deletedMessageIDs.forEach { (messageId, isSms) ->
MessagingModuleConfiguration.shared.messageDataProvider.deleteMessage(messageId, isSms)
}
}.fail {
Log.d("Loki", "Failed to get deleted messages for group chat with ID: ${openGroup.room} on server: ${openGroup.server}.")