feat: syncing / joining / leaving working on open group v2

This commit is contained in:
jubb
2021-04-23 17:49:24 +10:00
parent a4d79ea2d3
commit 6272856ef9
13 changed files with 180 additions and 74 deletions

View File

@@ -6,6 +6,7 @@ import com.esotericsoftware.kryo.io.Output
import org.session.libsession.messaging.MessagingConfiguration
import org.session.libsession.messaging.fileserver.FileServerAPI
import org.session.libsession.messaging.messages.Message
import org.session.libsession.messaging.opengroups.OpenGroupAPIV2
import org.session.libsession.messaging.sending_receiving.MessageSender
import org.session.libsession.messaging.utilities.DotNetAPI
import org.session.libsignal.service.api.crypto.AttachmentCipherOutputStream
@@ -52,10 +53,15 @@ class AttachmentUploadJob(val attachmentID: Long, val threadID: String, val mess
var shouldEncrypt = true
val usePadding = false
val openGroup = MessagingConfiguration.shared.storage.getOpenGroup(threadID)
val openGroupV2 = MessagingConfiguration.shared.storage.getV2OpenGroup(threadID)
openGroup?.let {
server = it.server
shouldEncrypt = false
}
openGroupV2?.let {
server = it.server
shouldEncrypt = false
}
val attachmentKey = Util.getSecretBytes(64)
val paddedLength = if (usePadding) PaddingInputStream.getPaddedSize(attachment.length) else attachment.length
@@ -65,7 +71,11 @@ class AttachmentUploadJob(val attachmentID: Long, val threadID: String, val mess
val outputStreamFactory = if (shouldEncrypt) AttachmentCipherOutputStreamFactory(attachmentKey) else PlaintextOutputStreamFactory()
val attachmentData = PushAttachmentData(attachment.contentType, dataStream, ciphertextLength, outputStreamFactory, attachment.listener)
val uploadResult = FileServerAPI.shared.uploadAttachment(server, attachmentData)
val uploadResult = if (openGroupV2 == null) FileServerAPI.shared.uploadAttachment(server, attachmentData) else {
val dataBytes = attachmentData.data.readBytes()
val result = OpenGroupAPIV2.upload(dataBytes, openGroupV2.room, openGroupV2.server).get()
DotNetAPI.UploadResult(result, "${openGroupV2.server}/files/$result", byteArrayOf())
}
handleSuccess(attachment, attachmentKey, uploadResult)
} catch (e: java.lang.Exception) {

View File

@@ -5,6 +5,9 @@ import org.session.libsession.messaging.threads.Address
import org.session.libsession.utilities.GroupUtil
import org.session.libsignal.service.loki.utilities.toHexString
typealias OpenGroupModel = org.session.libsession.messaging.opengroups.OpenGroup
typealias OpenGroupV2Model = org.session.libsession.messaging.opengroups.OpenGroupV2
sealed class Destination {
class Contact(var publicKey: String) : Destination() {
@@ -16,6 +19,9 @@ sealed class Destination {
class OpenGroup(var channel: Long, var server: String) : Destination() {
internal constructor(): this(0, "")
}
class OpenGroupV2(var room: String, var server: String): Destination() {
internal constructor(): this("", "")
}
companion object {
fun from(address: Address): Destination {
@@ -29,9 +35,13 @@ sealed class Destination {
ClosedGroup(groupPublicKey)
}
address.isOpenGroup -> {
val threadID = MessagingConfiguration.shared.storage.getThreadID(address.contactIdentifier())!!
val openGroup = MessagingConfiguration.shared.storage.getOpenGroup(threadID)!!
OpenGroup(openGroup.channel, openGroup.server)
val storage = MessagingConfiguration.shared.storage
val threadID = storage.getThreadID(address.contactIdentifier())!!
when (val openGroup = storage.getOpenGroup(threadID) ?: storage.getV2OpenGroup(threadID)) {
is OpenGroupModel -> OpenGroup(openGroup.channel, openGroup.server)
is OpenGroupV2Model -> OpenGroupV2(openGroup.room, openGroup.server)
else -> throw Exception("Invalid OpenGroup $openGroup")
}
}
else -> {
throw Exception("TODO: Handle legacy closed groups.")

View File

@@ -21,6 +21,7 @@ import org.session.libsignal.service.loki.utilities.DownloadUtilities
import org.session.libsignal.service.loki.utilities.removing05PrefixIfNeeded
import org.session.libsignal.service.loki.utilities.toHexString
import org.session.libsignal.utilities.Base64.*
import org.session.libsignal.utilities.Hex
import org.session.libsignal.utilities.JsonUtil
import org.session.libsignal.utilities.createContext
import org.session.libsignal.utilities.logging.Log
@@ -58,6 +59,11 @@ object OpenGroupAPIV2 {
val imageID: String?
)
data class CompactPollResult(val messages: List<OpenGroupMessageV2>,
val deletions: List<Long>,
val moderators: List<String>
)
data class Request(
val verb: HTTP.Verb,
val room: String?,
@@ -224,10 +230,10 @@ object OpenGroupAPIV2 {
fun send(message: OpenGroupMessageV2, room: String, server: String): Promise<OpenGroupMessageV2, Exception> {
val signedMessage = message.sign() ?: return Promise.ofFail(Error.SIGNING_FAILED)
val json = signedMessage.toJSON()
val request = Request(verb = POST, room = room, server = server, endpoint = "messages", parameters = json)
return send(request).map(sharedContext) {
@Suppress("UNCHECKED_CAST") val rawMessage = json["message"] as? Map<String, String>
val jsonMessage = signedMessage.toJSON()
val request = Request(verb = POST, room = room, server = server, endpoint = "messages", parameters = jsonMessage)
return send(request).map(sharedContext) { json ->
@Suppress("UNCHECKED_CAST") val rawMessage = json["message"] as? Map<String, Any>
?: throw Error.PARSING_FAILED
OpenGroupMessageV2.fromJSON(rawMessage) ?: throw Error.PARSING_FAILED
}
@@ -249,21 +255,25 @@ object OpenGroupAPIV2 {
var currentMax = lastMessageServerId
val messages = rawMessages.mapNotNull { json ->
val message = OpenGroupMessageV2.fromJSON(json) ?: return@mapNotNull null
if (message.serverID == null || message.sender.isNullOrEmpty()) return@mapNotNull null
val sender = message.sender
val data = decode(message.base64EncodedData)
val signature = decode(message.base64EncodedSignature)
val publicKey = sender.removing05PrefixIfNeeded().encodeToByteArray()
val isValid = curve.verifySignature(publicKey, data, signature)
if (!isValid) {
Log.d("Loki", "Ignoring message with invalid signature")
return@mapNotNull null
try {
val message = OpenGroupMessageV2.fromJSON(json) ?: return@mapNotNull null
if (message.serverID == null || message.sender.isNullOrEmpty()) return@mapNotNull null
val sender = message.sender
val data = decode(message.base64EncodedData)
val signature = decode(message.base64EncodedSignature)
val publicKey = Hex.fromStringCondensed(sender.removing05PrefixIfNeeded())
val isValid = curve.verifySignature(publicKey, data, signature)
if (!isValid) {
Log.d("Loki", "Ignoring message with invalid signature")
return@mapNotNull null
}
if (message.serverID > lastMessageServerId) {
currentMax = message.serverID
}
message
} catch (e: Exception) {
null
}
if (message.serverID > lastMessageServerId) {
currentMax = message.serverID
}
message
}
storage.setLastMessageServerId(room, server, currentMax)
messages
@@ -332,6 +342,10 @@ object OpenGroupAPIV2 {
// endregion
// region General
// fun getCompactPoll(): Promise<CompactPollResult, Exception> {
// val request = Request()
// }
fun getDefaultRoomsIfNeeded(): Promise<List<DefaultGroup>, Exception> {
val storage = MessagingConfiguration.shared.storage
storage.setOpenGroupPublicKey(DEFAULT_SERVER, DEFAULT_SERVER_PUBLIC_KEY)

View File

@@ -8,14 +8,14 @@ import org.session.libsignal.utilities.logging.Log
import org.whispersystems.curve25519.Curve25519
data class OpenGroupMessageV2(
val serverID: Long?,
val serverID: Long? = null,
val sender: String?,
val sentTimestamp: Long,
// The serialized protobuf in base64 encoding
val base64EncodedData: String,
// When sending a message, the sender signs the serialized protobuf with their private key so that
// a receiving user can verify that the message wasn't tampered with.
val base64EncodedSignature: String?
val base64EncodedSignature: String? = null
) {
companion object {
@@ -24,10 +24,10 @@ data class OpenGroupMessageV2(
fun fromJSON(json: Map<String, Any>): OpenGroupMessageV2? {
val base64EncodedData = json["data"] as? String ?: return null
val sentTimestamp = json["timestamp"] as? Long ?: return null
val serverID = json["server_id"] as? Long
val serverID = json["server_id"] as? Int
val sender = json["public_key"] as? String
val base64EncodedSignature = json["signature"] as? String
return OpenGroupMessageV2(serverID = serverID,
return OpenGroupMessageV2(serverID = serverID?.toLong(),
sender = sender,
sentTimestamp = sentTimestamp,
base64EncodedData = base64EncodedData,
@@ -44,7 +44,7 @@ data class OpenGroupMessageV2(
if (sender != publicKey) return null // only sign our own messages?
val signature = try {
curve.calculateSignature(privateKey, Base64.decode(base64EncodedData))
curve.calculateSignature(privateKey, decode(base64EncodedData))
} catch (e: Exception) {
Log.e("Loki", "Couldn't sign OpenGroupV2Message", e)
return null

View File

@@ -13,7 +13,9 @@ import org.session.libsession.messaging.messages.control.ConfigurationMessage
import org.session.libsession.messaging.messages.control.ExpirationTimerUpdate
import org.session.libsession.messaging.messages.visible.*
import org.session.libsession.messaging.opengroups.OpenGroupAPI
import org.session.libsession.messaging.opengroups.OpenGroupAPIV2
import org.session.libsession.messaging.opengroups.OpenGroupMessage
import org.session.libsession.messaging.opengroups.OpenGroupMessageV2
import org.session.libsession.messaging.threads.Address
import org.session.libsession.messaging.utilities.MessageWrapper
import org.session.libsession.snode.RawResponsePromise
@@ -63,7 +65,7 @@ object MessageSender {
// Convenience
fun send(message: Message, destination: Destination): Promise<Unit, Exception> {
if (destination is Destination.OpenGroup) {
if (destination is Destination.OpenGroup || destination is Destination.OpenGroupV2) {
return sendToOpenGroupDestination(destination, message)
}
return sendToSnodeDestination(destination, message)
@@ -91,7 +93,8 @@ object MessageSender {
when (destination) {
is Destination.Contact -> message.recipient = destination.publicKey
is Destination.ClosedGroup -> message.recipient = destination.groupPublicKey
is Destination.OpenGroup -> throw Error.PreconditionFailure("Destination should not be open groups!")
is Destination.OpenGroup,
is Destination.OpenGroupV2 -> throw Error.PreconditionFailure("Destination should not be open groups!")
}
// Validate the message
if (!message.isValid()) { throw Error.InvalidMessage }
@@ -129,7 +132,8 @@ object MessageSender {
val encryptionKeyPair = MessagingConfiguration.shared.storage.getLatestClosedGroupEncryptionKeyPair(destination.groupPublicKey)!!
ciphertext = MessageSenderEncryption.encryptWithSessionProtocol(plaintext, encryptionKeyPair.hexEncodedPublicKey)
}
is Destination.OpenGroup -> throw Error.PreconditionFailure("Destination should not be open groups!")
is Destination.OpenGroup,
is Destination.OpenGroupV2 -> throw Error.PreconditionFailure("Destination should not be open groups!")
}
// Wrap the result
val kind: SignalServiceProtos.Envelope.Type
@@ -143,7 +147,8 @@ object MessageSender {
kind = SignalServiceProtos.Envelope.Type.CLOSED_GROUP_CIPHERTEXT
senderPublicKey = destination.groupPublicKey
}
is Destination.OpenGroup -> throw Error.PreconditionFailure("Destination should not be open groups!")
is Destination.OpenGroup,
is Destination.OpenGroupV2 -> throw Error.PreconditionFailure("Destination should not be open groups!")
}
val wrappedMessage = MessageWrapper.wrap(kind, message.sentTimestamp!!, senderPublicKey, ciphertext)
// Calculate proof of work
@@ -207,32 +212,59 @@ object MessageSender {
deferred.reject(error)
}
try {
val server: String
val channel: Long
when (destination) {
is Destination.Contact -> throw Error.PreconditionFailure("Destination should not be contacts!")
is Destination.ClosedGroup -> throw Error.PreconditionFailure("Destination should not be closed groups!")
is Destination.OpenGroup -> {
message.recipient = "${destination.server}.${destination.channel}"
server = destination.server
channel = destination.channel
val server = destination.server
val channel = destination.channel
// Validate the message
if (message !is VisibleMessage || !message.isValid()) {
throw Error.InvalidMessage
}
// Convert the message to an open group message
val openGroupMessage = OpenGroupMessage.from(message, server) ?: run {
throw Error.InvalidMessage
}
// Send the result
OpenGroupAPI.sendMessage(openGroupMessage, channel, server).success {
message.openGroupServerMessageID = it.serverID
handleSuccessfulMessageSend(message, destination)
deferred.resolve(Unit)
}.fail {
handleFailure(it)
}
}
is Destination.OpenGroupV2 -> {
message.recipient = "${destination.server}.${destination.room}"
val server = destination.server
val room = destination.room
// Validate the message
if (message !is VisibleMessage || !message.isValid()) {
throw Error.InvalidMessage
}
val proto = message.toProto()!!
val openGroupMessage = OpenGroupMessageV2(
sender = message.sender,
sentTimestamp = message.sentTimestamp!!,
base64EncodedData = Base64.encodeBytes(PushTransportDetails.getPaddedMessageBody(proto.dataMessage!!.toByteArray())),
)
OpenGroupAPIV2.send(openGroupMessage,room,server).success {
message.openGroupServerMessageID = it.serverID
handleSuccessfulMessageSend(message, destination)
deferred.resolve(Unit)
}.fail {
handleFailure(it)
}
}
}
// Validate the message
if (message !is VisibleMessage || !message.isValid()) {
throw Error.InvalidMessage
}
// Convert the message to an open group message
val openGroupMessage = OpenGroupMessage.from(message, server) ?: kotlin.run {
throw Error.InvalidMessage
}
// Send the result
OpenGroupAPI.sendMessage(openGroupMessage, channel, server).success {
message.openGroupServerMessageID = it.serverID
handleSuccessfulMessageSend(message, destination)
deferred.resolve(Unit)
}.fail {
handleFailure(it)
}
} catch (exception: Exception) {
handleFailure(exception)

View File

@@ -71,9 +71,6 @@ class ClosedGroupPoller {
// ignore inactive group's messages
return@successBackground
}
if (messages.isNotEmpty()) {
Log.d("Loki", "Received ${messages.count()} new message(s) in closed group with public key: $publicKey.")
}
messages.forEach { message ->
val rawMessageAsJSON = message as? Map<*, *>
val base64EncodedData = rawMessageAsJSON?.get("data") as? String

View File

@@ -72,7 +72,6 @@ class OpenGroupPoller(private val openGroup: OpenGroup, private val executorServ
// Kovenant propagates a context to chained promises, so OpenGroupAPI.sharedContext should be used for all of the below
OpenGroupAPI.getMessages(openGroup.channel, openGroup.server).successBackground { messages ->
// Process messages in the background
Log.d("Loki", "received ${messages.size} messages")
messages.forEach { message ->
try {
val senderPublicKey = message.senderPublicKey

View File

@@ -62,7 +62,6 @@ class OpenGroupV2Poller(private val openGroup: OpenGroupV2, private val executor
// Kovenant propagates a context to chained promises, so OpenGroupAPI.sharedContext should be used for all of the below
OpenGroupAPIV2.getMessages(openGroup.room, openGroup.server).successBackground { messages ->
// Process messages in the background
Log.d("Loki", "received ${messages.size} messages")
messages.forEach { message ->
try {
val senderPublicKey = message.sender!!

View File

@@ -5,14 +5,10 @@ import nl.komponents.kovenant.functional.bind
import nl.komponents.kovenant.functional.map
import nl.komponents.kovenant.then
import okhttp3.*
import org.session.libsession.messaging.MessagingConfiguration
import org.session.libsession.messaging.fileserver.FileServerAPI
import org.session.libsession.snode.OnionRequestAPI
import org.session.libsession.snode.SnodeAPI
import org.session.libsession.messaging.fileserver.FileServerAPI
import org.session.libsignal.utilities.logging.Log
import org.session.libsignal.utilities.DiffieHellman
import org.session.libsignal.service.api.crypto.ProfileCipherOutputStream
import org.session.libsignal.service.api.messages.SignalServiceAttachment
import org.session.libsignal.service.api.push.exceptions.NonSuccessfulResponseCodeException
@@ -22,13 +18,12 @@ import org.session.libsignal.service.internal.push.ProfileAvatarData
import org.session.libsignal.service.internal.push.PushAttachmentData
import org.session.libsignal.service.internal.push.http.DigestingRequestBody
import org.session.libsignal.service.internal.push.http.ProfileCipherOutputStreamFactory
import org.session.libsignal.utilities.Hex
import org.session.libsignal.utilities.JsonUtil
import org.session.libsignal.service.loki.api.utilities.HTTP
import org.session.libsignal.service.loki.utilities.*
import org.session.libsignal.service.loki.utilities.removing05PrefixIfNeeded
import org.session.libsignal.service.loki.utilities.retryIfNeeded
import org.session.libsignal.utilities.*
import org.session.libsignal.utilities.Base64
import org.session.libsignal.utilities.logging.Log
import java.io.File
import java.io.FileOutputStream
import java.io.OutputStream
@@ -270,13 +265,13 @@ open class DotNetAPI {
return upload(server, request) { json -> // Retrying is handled by AttachmentUploadJob
val data = json["data"] as? Map<*, *>
if (data == null) {
Log.d("Loki", "Couldn't parse attachment from: $json.")
Log.e("Loki", "Couldn't parse attachment from: $json.")
throw Error.ParsingFailed
}
val id = data["id"] as? Long ?: (data["id"] as? Int)?.toLong() ?: (data["id"] as? String)?.toLong()
val url = data["url"] as? String
if (id == null || url == null || url.isEmpty()) {
Log.d("Loki", "Couldn't parse upload from: $json.")
Log.e("Loki", "Couldn't parse upload from: $json.")
throw Error.ParsingFailed
}
UploadResult(id, url, file.transmittedDigest)