feat: add open group v2 storage and db methods, starting on new open group v2 poller

This commit is contained in:
jubb
2021-04-13 17:17:16 +10:00
parent 201dde7412
commit 0eadc55325
12 changed files with 485 additions and 115 deletions

View File

@@ -10,6 +10,7 @@ import org.session.libsession.messaging.messages.control.ConfigurationMessage
import org.session.libsession.messaging.messages.visible.Attachment
import org.session.libsession.messaging.messages.visible.VisibleMessage
import org.session.libsession.messaging.opengroups.OpenGroup
import org.session.libsession.messaging.opengroups.OpenGroupV2
import org.session.libsession.messaging.sending_receiving.attachments.AttachmentId
import org.session.libsession.messaging.sending_receiving.attachments.DatabaseAttachment
import org.session.libsession.messaging.sending_receiving.linkpreview.LinkPreview
@@ -51,14 +52,16 @@ interface StorageProtocol {
fun isJobCanceled(job: Job): Boolean
// Authorization
fun getAuthToken(server: String): String?
fun setAuthToken(server: String, newValue: String?)
fun removeAuthToken(server: String)
fun getAuthToken(room: String, server: String): String?
fun setAuthToken(room: String, server: String, newValue: String)
fun removeAuthToken(room: String, server: String)
// Open Groups
fun getAllV2OpenGroups(): Map<Long, OpenGroupV2>
fun getV2OpenGroup(threadId: String): OpenGroupV2?
// Open Groups
fun getOpenGroup(threadID: String): OpenGroup?
fun getThreadID(openGroupID: String): String?
fun getAllOpenGroups(): Map<Long, OpenGroup>
fun addOpenGroup(server: String, channel: Long)
fun setOpenGroupServerMessageID(messageID: Long, serverID: Long)
fun getQuoteServerID(quoteID: Long, publicKey: String): Long?
@@ -72,21 +75,19 @@ interface StorageProtocol {
fun getOpenGroupDisplayName(publicKey: String, channel: Long, server: String): String?
// Open Group Metadata
fun setUserCount(group: Long, server: String, newValue: Int)
fun setOpenGroupProfilePictureURL(group: Long, server: String, newValue: String)
fun getOpenGroupProfilePictureURL(group: Long, server: String): String?
fun updateTitle(groupID: String, newValue: String)
fun updateProfilePicture(groupID: String, newValue: ByteArray)
// Last Message Server ID
fun getLastMessageServerID(group: Long, server: String): Long?
fun setLastMessageServerID(group: Long, server: String, newValue: Long)
fun removeLastMessageServerID(group: Long, server: String)
fun getLastMessageServerId(room: String, server: String): Long?
fun setLastMessageServerId(room: String, server: String, newValue: Long)
fun removeLastMessageServerId(room: String, server: String)
// Last Deletion Server ID
fun getLastDeletionServerID(group: Long, server: String): Long?
fun setLastDeletionServerID(group: Long, server: String, newValue: Long)
fun removeLastDeletionServerID(group: Long, server: String)
fun getLastDeletionServerId(room: String, server: String): Long?
fun setLastDeletionServerId(room: String, server: String, newValue: Long)
fun removeLastDeletionServerId(room: String, server: String)
// Message Handling
fun isMessageDuplicated(timestamp: Long, sender: String): Boolean
@@ -158,4 +159,25 @@ interface StorageProtocol {
// Message Handling
/// Returns the ID of the `TSIncomingMessage` that was constructed.
fun persist(message: VisibleMessage, quotes: QuoteModel?, linkPreview: List<LinkPreview?>, groupPublicKey: String?, openGroupID: String?, attachments: List<Attachment>): Long?
// DEPRECATED
fun getAuthToken(server: String): String?
fun setAuthToken(server: String, newValue: String?)
fun removeAuthToken(server: String)
fun getLastMessageServerID(group: Long, server: String): Long?
fun setLastMessageServerID(group: Long, server: String, newValue: Long)
fun removeLastMessageServerID(group: Long, server: String)
fun getLastDeletionServerID(group: Long, server: String): Long?
fun setLastDeletionServerID(group: Long, server: String, newValue: Long)
fun removeLastDeletionServerID(group: Long, server: String)
fun getOpenGroup(threadID: String): OpenGroup?
fun getAllOpenGroups(): Map<Long, OpenGroup>
fun setUserCount(group: Long, server: String, newValue: Int)
fun setOpenGroupProfilePictureURL(group: Long, server: String, newValue: String)
fun getOpenGroupProfilePictureURL(group: Long, server: String): String?
}

View File

@@ -0,0 +1,37 @@
package org.session.libsession.messaging.opengroups
import org.session.libsession.messaging.opengroups.OpenGroupAPIV2.Error
import org.session.libsession.messaging.utilities.DotNetAPI
import java.util.*
class OpenGroupAPIV2: DotNetAPI() {
enum class Error {
GENERIC,
PARSING_FAILED,
DECRYPTION_FAILED,
SIGNING_FAILED,
INVALID_URL,
NO_PUBLIC_KEY
}
companion object {
private val moderators: HashMap<String, HashMap<String, Set<String>>> = hashMapOf() // Server URL to (channel ID to set of moderator IDs)
const val DEFAULT_SERVER = "https://sessionopengroup.com"
const val DEFAULT_SERVER_PUBLIC_KEY = "658d29b91892a2389505596b135e76a53db6e11d613a51dbd3d0816adffb231b"
}
}
data class Info(val id: String, val name: String, val imageId: String?)
fun Error.errorDescription() = when (this) {
Error.GENERIC -> "An error occurred."
Error.PARSING_FAILED -> "Invalid response."
Error.DECRYPTION_FAILED -> "Couldn't decrypt response."
Error.SIGNING_FAILED -> "Couldn't sign message."
Error.INVALID_URL -> "Invalid URL."
Error.NO_PUBLIC_KEY -> "Couldn't find server public key."
}

View File

@@ -0,0 +1,45 @@
package org.session.libsession.messaging.opengroups
import org.session.libsignal.utilities.JsonUtil
import java.util.*
data class OpenGroupV2(
val server: String,
val room: String,
val id: String,
val name: String,
val publicKey: String,
val imageId: String?
) {
constructor(server: String, room: String, name: String, publicKey: String, imageId: String?) : this(
server = server,
room = room,
id = "$server.$room",
name = name,
publicKey = publicKey,
imageId = imageId
)
companion object {
fun fromJson(jsonAsString: String): OpenGroupV2? {
return try {
val json = JsonUtil.fromJson(jsonAsString)
if (!json.has("room")) return null
val room = json.get("room").asText().toLowerCase(Locale.getDefault())
val server = json.get("server").asText().toLowerCase(Locale.getDefault())
val displayName = json.get("displayName").asText()
val publicKey = json.get("publicKey").asText()
val imageId = json.get("imageId").asText().let { str -> if (str.isEmpty()) null else str }
OpenGroupV2(server, room, displayName, publicKey, imageId)
} catch (e: Exception) {
null
}
}
}
}

View File

@@ -105,8 +105,10 @@ private fun MessageReceiver.handleConfigurationMessage(message: ConfigurationMes
handleNewClosedGroup(message.sender!!, message.sentTimestamp!!, closeGroup.publicKey, closeGroup.name, closeGroup.encryptionKeyPair!!, closeGroup.members, closeGroup.admins, message.sentTimestamp!!)
}
val allOpenGroups = storage.getAllOpenGroups().map { it.value.server }
val allV2OpenGroups = storage.getAllV2OpenGroups().map { it.value.server }
for (openGroup in message.openGroups) {
if (allOpenGroups.contains(openGroup)) continue
// TODO: add in v2
storage.addOpenGroup(openGroup, 1)
}
if (message.displayName.isNotEmpty()) {

View File

@@ -0,0 +1,225 @@
package org.session.libsession.messaging.sending_receiving.pollers
import com.google.protobuf.ByteString
import nl.komponents.kovenant.Promise
import nl.komponents.kovenant.deferred
import org.session.libsession.messaging.MessagingConfiguration
import org.session.libsession.messaging.jobs.JobQueue
import org.session.libsession.messaging.jobs.MessageReceiveJob
import org.session.libsession.messaging.opengroups.*
import org.session.libsignal.service.internal.push.SignalServiceProtos
import org.session.libsignal.utilities.logging.Log
import org.session.libsignal.utilities.successBackground
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.ScheduledFuture
import java.util.concurrent.TimeUnit
class OpenGroupV2Poller(private val openGroup: OpenGroupV2, private val executorService: ScheduledExecutorService? = null) {
private var hasStarted = false
@Volatile private var isPollOngoing = false
var isCaughtUp = false
private val cancellableFutures = mutableListOf<ScheduledFuture<out Any>>()
// region Convenience
private val userHexEncodedPublicKey = MessagingConfiguration.shared.storage.getUserPublicKey() ?: ""
private var displayNameUpdates = setOf<String>()
// endregion
// region Settings
companion object {
private val pollForNewMessagesInterval: Long = 10 * 1000
private val pollForDeletedMessagesInterval: Long = 60 * 1000
private val pollForModeratorsInterval: Long = 10 * 60 * 1000
private val pollForDisplayNamesInterval: Long = 60 * 1000
}
// endregion
// region Lifecycle
fun startIfNeeded() {
if (hasStarted || executorService == null) return
cancellableFutures += listOf(
executorService.scheduleAtFixedRate(::pollForNewMessages,0, pollForNewMessagesInterval, TimeUnit.MILLISECONDS),
executorService.scheduleAtFixedRate(::pollForDeletedMessages,0, pollForDeletedMessagesInterval, TimeUnit.MILLISECONDS),
executorService.scheduleAtFixedRate(::pollForModerators,0, pollForModeratorsInterval, TimeUnit.MILLISECONDS),
executorService.scheduleAtFixedRate(::pollForDisplayNames,0, pollForDisplayNamesInterval, TimeUnit.MILLISECONDS)
)
hasStarted = true
}
fun stop() {
cancellableFutures.forEach { future ->
future.cancel(false)
}
cancellableFutures.clear()
hasStarted = false
}
// endregion
// region Polling
fun pollForNewMessages(): Promise<Unit, Exception> {
return pollForNewMessages(false)
}
private fun pollForNewMessages(isBackgroundPoll: Boolean): Promise<Unit, Exception> {
if (isPollOngoing) { return Promise.of(Unit) }
isPollOngoing = true
val deferred = deferred<Unit, Exception>()
// Kovenant propagates a context to chained promises, so OpenGroupAPI.sharedContext should be used for all of the below
OpenGroupAPIV2.getMessages(openGroup.room, openGroup.server).successBackground { messages ->
// Process messages in the background
Log.d("Loki", "received ${messages.size} messages")
messages.forEach { message ->
try {
val senderPublicKey = message.senderPublicKey
fun generateDisplayName(rawDisplayName: String): String {
return "$rawDisplayName (...${senderPublicKey.takeLast(8)})"
}
val senderDisplayName = MessagingConfiguration.shared.storage.getOpenGroupDisplayName(senderPublicKey, openGroup.channel, openGroup.server) ?: generateDisplayName(message.displayName)
val id = openGroup.id.toByteArray()
// Main message
val dataMessageProto = SignalServiceProtos.DataMessage.newBuilder()
val body = if (message.body == message.timestamp.toString()) { "" } else { message.body }
dataMessageProto.setBody(body)
dataMessageProto.setTimestamp(message.timestamp)
// Attachments
val attachmentProtos = message.attachments.mapNotNull { attachment ->
try {
if (attachment.kind != OpenGroupMessage.Attachment.Kind.Attachment) { return@mapNotNull null }
val attachmentProto = SignalServiceProtos.AttachmentPointer.newBuilder()
attachmentProto.setId(attachment.serverID)
attachmentProto.setContentType(attachment.contentType)
attachmentProto.setSize(attachment.size)
attachmentProto.setFileName(attachment.fileName)
attachmentProto.setFlags(attachment.flags)
attachmentProto.setWidth(attachment.width)
attachmentProto.setHeight(attachment.height)
attachment.caption?.let { attachmentProto.setCaption(it) }
attachmentProto.setUrl(attachment.url)
attachmentProto.build()
} catch (e: Exception) {
Log.e("Loki","Failed to parse attachment as proto",e)
null
}
}
dataMessageProto.addAllAttachments(attachmentProtos)
// Link preview
val linkPreview = message.attachments.firstOrNull { it.kind == OpenGroupMessage.Attachment.Kind.LinkPreview }
if (linkPreview != null) {
val linkPreviewProto = SignalServiceProtos.DataMessage.Preview.newBuilder()
linkPreviewProto.setUrl(linkPreview.linkPreviewURL!!)
linkPreviewProto.setTitle(linkPreview.linkPreviewTitle!!)
val attachmentProto = SignalServiceProtos.AttachmentPointer.newBuilder()
attachmentProto.setId(linkPreview.serverID)
attachmentProto.setContentType(linkPreview.contentType)
attachmentProto.setSize(linkPreview.size)
attachmentProto.setFileName(linkPreview.fileName)
attachmentProto.setFlags(linkPreview.flags)
attachmentProto.setWidth(linkPreview.width)
attachmentProto.setHeight(linkPreview.height)
linkPreview.caption?.let { attachmentProto.setCaption(it) }
attachmentProto.setUrl(linkPreview.url)
linkPreviewProto.setImage(attachmentProto.build())
dataMessageProto.addPreview(linkPreviewProto.build())
}
// Quote
val quote = message.quote
if (quote != null) {
val quoteProto = SignalServiceProtos.DataMessage.Quote.newBuilder()
quoteProto.setId(quote.quotedMessageTimestamp)
quoteProto.setAuthor(quote.quoteePublicKey)
if (quote.quotedMessageBody != quote.quotedMessageTimestamp.toString()) { quoteProto.setText(quote.quotedMessageBody) }
dataMessageProto.setQuote(quoteProto.build())
}
val messageServerID = message.serverID
// Profile
val profileProto = SignalServiceProtos.DataMessage.LokiProfile.newBuilder()
profileProto.setDisplayName(senderDisplayName)
val profilePicture = message.profilePicture
if (profilePicture != null) {
profileProto.setProfilePicture(profilePicture.url)
dataMessageProto.setProfileKey(ByteString.copyFrom(profilePicture.profileKey))
}
dataMessageProto.setProfile(profileProto.build())
/* TODO: the signal service proto needs to be synced with iOS
// Open group info
if (messageServerID != null) {
val openGroupProto = PublicChatInfo.newBuilder()
openGroupProto.setServerID(messageServerID)
dataMessageProto.setPublicChatInfo(openGroupProto.build())
}
*/
// Signal group context
val groupProto = SignalServiceProtos.GroupContext.newBuilder()
groupProto.setId(ByteString.copyFrom(id))
groupProto.setType(SignalServiceProtos.GroupContext.Type.DELIVER)
groupProto.setName(openGroup.displayName)
dataMessageProto.setGroup(groupProto.build())
// Content
val content = SignalServiceProtos.Content.newBuilder()
content.setDataMessage(dataMessageProto.build())
// Envelope
val builder = SignalServiceProtos.Envelope.newBuilder()
builder.type = SignalServiceProtos.Envelope.Type.UNIDENTIFIED_SENDER
builder.source = senderPublicKey
builder.sourceDevice = 1
builder.setContent(content.build().toByteString())
builder.timestamp = message.timestamp
builder.serverTimestamp = message.serverTimestamp
val envelope = builder.build()
val job = MessageReceiveJob(envelope.toByteArray(), isBackgroundPoll, messageServerID, openGroup.id)
Log.d("Loki", "Scheduling Job $job")
if (isBackgroundPoll) {
job.executeAsync().always { deferred.resolve(Unit) }
// The promise is just used to keep track of when we're done
} else {
JobQueue.shared.add(job)
}
} catch (e: Exception) {
Log.e("Loki", "Exception parsing message", e)
}
}
displayNameUpdates = displayNameUpdates + messages.map { it.senderPublicKey }.toSet() - userHexEncodedPublicKey
executorService?.schedule(::pollForDisplayNames, 0, TimeUnit.MILLISECONDS)
isCaughtUp = true
isPollOngoing = false
deferred.resolve(Unit)
}.fail {
Log.d("Loki", "Failed to get messages for group chat with ID: ${openGroup.channel} on server: ${openGroup.server}.")
isPollOngoing = false
}
return deferred.promise
}
private fun pollForDisplayNames() {
if (displayNameUpdates.isEmpty()) { return }
val hexEncodedPublicKeys = displayNameUpdates
displayNameUpdates = setOf()
OpenGroupAPI.getDisplayNames(hexEncodedPublicKeys, openGroup.server).successBackground { mapping ->
for (pair in mapping.entries) {
if (pair.key == userHexEncodedPublicKey) continue
val senderDisplayName = "${pair.value} (...${pair.key.substring(pair.key.count() - 8)})"
MessagingConfiguration.shared.storage.setOpenGroupDisplayName(pair.key, openGroup.channel, openGroup.server, senderDisplayName)
}
}.fail {
displayNameUpdates = displayNameUpdates.union(hexEncodedPublicKeys)
}
}
private fun pollForDeletedMessages() {
OpenGroupAPI.getDeletedMessageServerIDs(openGroup.channel, openGroup.server).success { deletedMessageServerIDs ->
val deletedMessageIDs = deletedMessageServerIDs.mapNotNull { MessagingConfiguration.shared.messageDataProvider.getMessageID(it) }
deletedMessageIDs.forEach {
MessagingConfiguration.shared.messageDataProvider.deleteMessage(it)
}
}.fail {
Log.d("Loki", "Failed to get deleted messages for group chat with ID: ${openGroup.channel} on server: ${openGroup.server}.")
}
}
private fun pollForModerators() {
OpenGroupAPI.getModerators(openGroup.channel, openGroup.server)
}
// endregion
}