mirror of
https://github.com/oxen-io/session-android.git
synced 2025-08-12 14:27:44 +00:00
Merge branch 'dev' of https://github.com/loki-project/session-android into open-group-invitations
This commit is contained in:
@@ -11,7 +11,8 @@ import java.io.InputStream
|
||||
interface MessageDataProvider {
|
||||
|
||||
fun getMessageID(serverID: Long): Long?
|
||||
fun deleteMessage(messageID: Long)
|
||||
fun getMessageID(serverId: Long, threadId: Long): Pair<Long, Boolean>?
|
||||
fun deleteMessage(messageID: Long, isSms: Boolean)
|
||||
|
||||
fun getDatabaseAttachment(attachmentId: Long): DatabaseAttachment?
|
||||
|
||||
|
@@ -10,6 +10,7 @@ import org.session.libsession.messaging.messages.control.ConfigurationMessage
|
||||
import org.session.libsession.messaging.messages.visible.Attachment
|
||||
import org.session.libsession.messaging.messages.visible.VisibleMessage
|
||||
import org.session.libsession.messaging.open_groups.OpenGroup
|
||||
import org.session.libsession.messaging.open_groups.OpenGroupV2
|
||||
import org.session.libsession.messaging.sending_receiving.attachments.AttachmentId
|
||||
import org.session.libsession.messaging.sending_receiving.data_extraction.DataExtractionNotificationInfoMessage
|
||||
import org.session.libsession.messaging.sending_receiving.attachments.DatabaseAttachment
|
||||
@@ -52,16 +53,18 @@ interface StorageProtocol {
|
||||
fun isJobCanceled(job: Job): Boolean
|
||||
|
||||
// Authorization
|
||||
fun getAuthToken(server: String): String?
|
||||
fun setAuthToken(server: String, newValue: String?)
|
||||
fun removeAuthToken(server: String)
|
||||
fun getAuthToken(room: String, server: String): String?
|
||||
fun setAuthToken(room: String, server: String, newValue: String)
|
||||
fun removeAuthToken(room: String, server: String)
|
||||
|
||||
// Open Groups
|
||||
fun getAllV2OpenGroups(): Map<Long, OpenGroupV2>
|
||||
fun getV2OpenGroup(threadId: String): OpenGroupV2?
|
||||
|
||||
// Open Groups
|
||||
fun getOpenGroup(threadID: String): OpenGroup?
|
||||
fun getThreadID(openGroupID: String): String?
|
||||
fun getAllOpenGroups(): Map<Long, OpenGroup>
|
||||
fun addOpenGroup(server: String, channel: Long)
|
||||
fun setOpenGroupServerMessageID(messageID: Long, serverID: Long)
|
||||
fun addOpenGroup(serverUrl: String, channel: Long)
|
||||
fun setOpenGroupServerMessageID(messageID: Long, serverID: Long, threadID: Long, isSms: Boolean)
|
||||
fun getQuoteServerID(quoteID: Long, publicKey: String): Long?
|
||||
|
||||
// Open Group Public Keys
|
||||
@@ -69,31 +72,30 @@ interface StorageProtocol {
|
||||
fun setOpenGroupPublicKey(server: String, newValue: String)
|
||||
|
||||
// Open Group User Info
|
||||
fun setOpenGroupDisplayName(publicKey: String, channel: Long, server: String, displayName: String)
|
||||
fun getOpenGroupDisplayName(publicKey: String, channel: Long, server: String): String?
|
||||
fun setOpenGroupDisplayName(publicKey: String, room: String, server: String, displayName: String)
|
||||
fun getOpenGroupDisplayName(publicKey: String, room: String, server: String): String?
|
||||
|
||||
// Open Group Metadata
|
||||
fun setUserCount(group: Long, server: String, newValue: Int)
|
||||
fun setOpenGroupProfilePictureURL(group: Long, server: String, newValue: String)
|
||||
fun getOpenGroupProfilePictureURL(group: Long, server: String): String?
|
||||
|
||||
fun updateTitle(groupID: String, newValue: String)
|
||||
fun updateProfilePicture(groupID: String, newValue: ByteArray)
|
||||
fun setUserCount(room: String, server: String, newValue: Int)
|
||||
|
||||
// Last Message Server ID
|
||||
fun getLastMessageServerID(group: Long, server: String): Long?
|
||||
fun setLastMessageServerID(group: Long, server: String, newValue: Long)
|
||||
fun removeLastMessageServerID(group: Long, server: String)
|
||||
fun getLastMessageServerId(room: String, server: String): Long?
|
||||
fun setLastMessageServerId(room: String, server: String, newValue: Long)
|
||||
fun removeLastMessageServerId(room: String, server: String)
|
||||
|
||||
// Last Deletion Server ID
|
||||
fun getLastDeletionServerID(group: Long, server: String): Long?
|
||||
fun setLastDeletionServerID(group: Long, server: String, newValue: Long)
|
||||
fun removeLastDeletionServerID(group: Long, server: String)
|
||||
fun getLastDeletionServerId(room: String, server: String): Long?
|
||||
fun setLastDeletionServerId(room: String, server: String, newValue: Long)
|
||||
fun removeLastDeletionServerId(room: String, server: String)
|
||||
|
||||
// Message Handling
|
||||
fun isMessageDuplicated(timestamp: Long, sender: String): Boolean
|
||||
fun getReceivedMessageTimestamps(): Set<Long>
|
||||
fun addReceivedMessageTimestamp(timestamp: Long)
|
||||
// fun removeReceivedMessageTimestamps(timestamps: Set<Long>)
|
||||
fun removeReceivedMessageTimestamps(timestamps: Set<Long>)
|
||||
// Returns the IDs of the saved attachments.
|
||||
fun persistAttachments(messageId: Long, attachments: List<Attachment>): List<Long>
|
||||
fun getAttachmentsForMessage(messageId: Long): List<DatabaseAttachment>
|
||||
@@ -137,6 +139,7 @@ interface StorageProtocol {
|
||||
fun getOrCreateThreadIdFor(address: Address): Long
|
||||
fun getOrCreateThreadIdFor(publicKey: String, groupPublicKey: String?, openGroupID: String?): Long
|
||||
fun getThreadIdFor(address: Address): Long?
|
||||
fun getThreadIdForMms(mmsId: Long): Long
|
||||
|
||||
// Session Request
|
||||
fun getSessionRequestSentTimestamp(publicKey: String): Long?
|
||||
@@ -164,4 +167,28 @@ interface StorageProtocol {
|
||||
|
||||
// Data Extraction Notification
|
||||
fun insertDataExtractionNotificationMessage(senderPublicKey: String, message: DataExtractionNotificationInfoMessage, sentTimestamp: Long)
|
||||
|
||||
// DEPRECATED
|
||||
fun getAuthToken(server: String): String?
|
||||
fun setAuthToken(server: String, newValue: String?)
|
||||
fun removeAuthToken(server: String)
|
||||
|
||||
fun getLastMessageServerID(group: Long, server: String): Long?
|
||||
fun setLastMessageServerID(group: Long, server: String, newValue: Long)
|
||||
fun removeLastMessageServerID(group: Long, server: String)
|
||||
|
||||
fun getLastDeletionServerID(group: Long, server: String): Long?
|
||||
fun setLastDeletionServerID(group: Long, server: String, newValue: Long)
|
||||
fun removeLastDeletionServerID(group: Long, server: String)
|
||||
|
||||
fun getOpenGroup(threadID: String): OpenGroup?
|
||||
fun getAllOpenGroups(): Map<Long, OpenGroup>
|
||||
|
||||
fun setUserCount(group: Long, server: String, newValue: Int)
|
||||
fun setOpenGroupProfilePictureURL(group: Long, server: String, newValue: String)
|
||||
fun getOpenGroupProfilePictureURL(group: Long, server: String): String?
|
||||
|
||||
fun setOpenGroupDisplayName(publicKey: String, channel: Long, server: String, displayName: String)
|
||||
fun getOpenGroupDisplayName(publicKey: String, channel: Long, server: String): String?
|
||||
|
||||
}
|
||||
|
@@ -0,0 +1,106 @@
|
||||
package org.session.libsession.messaging.file_server
|
||||
|
||||
import nl.komponents.kovenant.Promise
|
||||
import nl.komponents.kovenant.functional.bind
|
||||
import nl.komponents.kovenant.functional.map
|
||||
import okhttp3.Headers
|
||||
import okhttp3.HttpUrl
|
||||
import okhttp3.MediaType
|
||||
import okhttp3.RequestBody
|
||||
import org.session.libsession.messaging.MessagingModuleConfiguration
|
||||
import org.session.libsession.messaging.open_groups.OpenGroupAPIV2
|
||||
import org.session.libsession.snode.OnionRequestAPI
|
||||
import org.session.libsignal.service.loki.HTTP
|
||||
import org.session.libsignal.service.loki.utilities.retryIfNeeded
|
||||
import org.session.libsignal.utilities.Base64
|
||||
import org.session.libsignal.utilities.JsonUtil
|
||||
import org.session.libsignal.utilities.logging.Log
|
||||
|
||||
object FileServerAPIV2 {
|
||||
|
||||
const val DEFAULT_SERVER = "http://88.99.175.227"
|
||||
private const val DEFAULT_SERVER_PUBLIC_KEY = "7cb31905b55cd5580c686911debf672577b3fb0bff81df4ce2d5c4cb3a7aaa69"
|
||||
|
||||
sealed class Error : Exception() {
|
||||
object PARSING_FAILED : Error()
|
||||
object INVALID_URL : Error()
|
||||
|
||||
fun errorDescription() = when (this) {
|
||||
PARSING_FAILED -> "Invalid response."
|
||||
INVALID_URL -> "Invalid URL."
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
data class Request(
|
||||
val verb: HTTP.Verb,
|
||||
val endpoint: String,
|
||||
val queryParameters: Map<String, String> = mapOf(),
|
||||
val parameters: Any? = null,
|
||||
val headers: Map<String, String> = mapOf(),
|
||||
// Always `true` under normal circumstances. You might want to disable
|
||||
// this when running over Lokinet.
|
||||
val useOnionRouting: Boolean = true
|
||||
)
|
||||
|
||||
private fun createBody(parameters: Any?): RequestBody? {
|
||||
if (parameters == null) return null
|
||||
|
||||
val parametersAsJSON = JsonUtil.toJson(parameters)
|
||||
return RequestBody.create(MediaType.get("application/json"), parametersAsJSON)
|
||||
}
|
||||
|
||||
private fun send(request: Request): Promise<Map<*, *>, Exception> {
|
||||
val parsed = HttpUrl.parse(DEFAULT_SERVER) ?: return Promise.ofFail(OpenGroupAPIV2.Error.INVALID_URL)
|
||||
val urlBuilder = HttpUrl.Builder()
|
||||
.scheme(parsed.scheme())
|
||||
.host(parsed.host())
|
||||
.port(parsed.port())
|
||||
.addPathSegments(request.endpoint)
|
||||
|
||||
if (request.verb == HTTP.Verb.GET) {
|
||||
for ((key, value) in request.queryParameters) {
|
||||
urlBuilder.addQueryParameter(key, value)
|
||||
}
|
||||
}
|
||||
|
||||
val requestBuilder = okhttp3.Request.Builder()
|
||||
.url(urlBuilder.build())
|
||||
.headers(Headers.of(request.headers))
|
||||
when (request.verb) {
|
||||
HTTP.Verb.GET -> requestBuilder.get()
|
||||
HTTP.Verb.PUT -> requestBuilder.put(createBody(request.parameters)!!)
|
||||
HTTP.Verb.POST -> requestBuilder.post(createBody(request.parameters)!!)
|
||||
HTTP.Verb.DELETE -> requestBuilder.delete(createBody(request.parameters))
|
||||
}
|
||||
|
||||
if (request.useOnionRouting) {
|
||||
return OnionRequestAPI.sendOnionRequest(requestBuilder.build(), DEFAULT_SERVER, DEFAULT_SERVER_PUBLIC_KEY)
|
||||
.fail { e ->
|
||||
Log.e("Loki", "FileServerV2 failed with error",e)
|
||||
}
|
||||
} else {
|
||||
return Promise.ofFail(IllegalStateException("It's currently not allowed to send non onion routed requests."))
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// region Sending
|
||||
fun upload(file: ByteArray): Promise<Long, Exception> {
|
||||
val base64EncodedFile = Base64.encodeBytes(file)
|
||||
val parameters = mapOf("file" to base64EncodedFile)
|
||||
val request = Request(verb = HTTP.Verb.POST, endpoint = "files", parameters = parameters)
|
||||
return send(request).map { json ->
|
||||
json["result"] as? Long ?: throw OpenGroupAPIV2.Error.PARSING_FAILED
|
||||
}
|
||||
}
|
||||
|
||||
fun download(file: Long): Promise<ByteArray, Exception> {
|
||||
val request = Request(verb = HTTP.Verb.GET, endpoint = "files/$file")
|
||||
return send(request).map { json ->
|
||||
val base64EncodedFile = json["result"] as? String ?: throw Error.PARSING_FAILED
|
||||
Base64.decode(base64EncodedFile) ?: throw Error.PARSING_FAILED
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@@ -1,16 +1,20 @@
|
||||
package org.session.libsession.messaging.jobs
|
||||
|
||||
import okhttp3.HttpUrl
|
||||
import org.session.libsession.messaging.MessagingModuleConfiguration
|
||||
import org.session.libsession.messaging.file_server.FileServerAPI
|
||||
import org.session.libsession.messaging.file_server.FileServerAPIV2
|
||||
import org.session.libsession.messaging.open_groups.OpenGroupAPIV2
|
||||
import org.session.libsession.messaging.sending_receiving.attachments.AttachmentState
|
||||
import org.session.libsession.messaging.utilities.DotNetAPI
|
||||
import org.session.libsession.utilities.DownloadUtilities
|
||||
import org.session.libsignal.service.api.crypto.AttachmentCipherInputStream
|
||||
import org.session.libsignal.utilities.Base64
|
||||
import org.session.libsignal.utilities.logging.Log
|
||||
import java.io.File
|
||||
import java.io.FileInputStream
|
||||
|
||||
class AttachmentDownloadJob(val attachmentID: Long, val databaseMessageID: Long): Job {
|
||||
class AttachmentDownloadJob(val attachmentID: Long, val databaseMessageID: Long) : Job {
|
||||
override var delegate: JobDelegate? = null
|
||||
override var id: String? = null
|
||||
override var failureCount: Int = 0
|
||||
@@ -22,6 +26,7 @@ class AttachmentDownloadJob(val attachmentID: Long, val databaseMessageID: Long)
|
||||
|
||||
// Settings
|
||||
override val maxFailureCount: Int = 20
|
||||
|
||||
companion object {
|
||||
val KEY: String = "AttachmentDownloadJob"
|
||||
|
||||
@@ -46,18 +51,28 @@ class AttachmentDownloadJob(val attachmentID: Long, val databaseMessageID: Long)
|
||||
}
|
||||
try {
|
||||
val messageDataProvider = MessagingModuleConfiguration.shared.messageDataProvider
|
||||
val attachment = messageDataProvider.getDatabaseAttachment(attachmentID) ?: return handleFailure(Error.NoAttachment)
|
||||
val attachment = messageDataProvider.getDatabaseAttachment(attachmentID)
|
||||
?: return handleFailure(Error.NoAttachment)
|
||||
messageDataProvider.setAttachmentState(AttachmentState.STARTED, attachmentID, this.databaseMessageID)
|
||||
val tempFile = createTempFile()
|
||||
|
||||
FileServerAPI.shared.downloadFile(tempFile, attachment.url, null)
|
||||
|
||||
// Assume we're retrieving an attachment for an open group server if the digest is not set
|
||||
val stream = if (attachment.digest?.size ?: 0 == 0 || attachment.key.isNullOrEmpty()) FileInputStream(tempFile)
|
||||
else AttachmentCipherInputStream.createForAttachment(tempFile, attachment.size, Base64.decode(attachment.key), attachment.digest)
|
||||
val threadId = MessagingModuleConfiguration.shared.storage.getThreadIdForMms(databaseMessageID)
|
||||
val openGroupV2 = MessagingModuleConfiguration.shared.storage.getV2OpenGroup(threadId.toString())
|
||||
|
||||
val stream = if (openGroupV2 == null) {
|
||||
DownloadUtilities.downloadFile(tempFile, attachment.url, FileServerAPI.maxFileSize, null)
|
||||
// Assume we're retrieving an attachment for an open group server if the digest is not set
|
||||
if (attachment.digest?.size ?: 0 == 0 || attachment.key.isNullOrEmpty()) FileInputStream(tempFile)
|
||||
else AttachmentCipherInputStream.createForAttachment(tempFile, attachment.size, Base64.decode(attachment.key), attachment.digest)
|
||||
} else {
|
||||
val url = HttpUrl.parse(attachment.url)!!
|
||||
val fileId = url.pathSegments().last()
|
||||
OpenGroupAPIV2.download(fileId.toLong(), openGroupV2.room, openGroupV2.server).get().let {
|
||||
tempFile.writeBytes(it)
|
||||
}
|
||||
FileInputStream(tempFile)
|
||||
}
|
||||
messageDataProvider.insertAttachment(databaseMessageID, attachment.attachmentId, stream)
|
||||
|
||||
tempFile.delete()
|
||||
handleSuccess()
|
||||
} catch (e: Exception) {
|
||||
@@ -94,8 +109,7 @@ class AttachmentDownloadJob(val attachmentID: Long, val databaseMessageID: Long)
|
||||
return KEY
|
||||
}
|
||||
|
||||
class Factory: Job.Factory<AttachmentDownloadJob> {
|
||||
|
||||
class Factory : Job.Factory<AttachmentDownloadJob> {
|
||||
override fun create(data: Data): AttachmentDownloadJob {
|
||||
return AttachmentDownloadJob(data.getLong(KEY_ATTACHMENT_ID), data.getLong(KEY_TS_INCOMING_MESSAGE_ID))
|
||||
}
|
||||
|
@@ -6,6 +6,7 @@ import com.esotericsoftware.kryo.io.Output
|
||||
import org.session.libsession.messaging.MessagingModuleConfiguration
|
||||
import org.session.libsession.messaging.file_server.FileServerAPI
|
||||
import org.session.libsession.messaging.messages.Message
|
||||
import org.session.libsession.messaging.open_groups.OpenGroupAPIV2
|
||||
import org.session.libsession.messaging.sending_receiving.MessageSender
|
||||
import org.session.libsession.messaging.utilities.DotNetAPI
|
||||
import org.session.libsignal.service.api.crypto.AttachmentCipherOutputStream
|
||||
@@ -46,9 +47,14 @@ class AttachmentUploadJob(val attachmentID: Long, val threadID: String, val mess
|
||||
?: return handleFailure(Error.NoAttachment)
|
||||
|
||||
val usePadding = false
|
||||
val openGroupV2 = MessagingModuleConfiguration.shared.storage.getV2OpenGroup(threadID)
|
||||
val openGroup = MessagingModuleConfiguration.shared.storage.getOpenGroup(threadID)
|
||||
val server = if (openGroup != null) openGroup.server else FileServerAPI.shared.server
|
||||
val shouldEncrypt = (openGroup == null) // Encrypt if this isn't an open group
|
||||
val server = openGroup?.let {
|
||||
it.server
|
||||
} ?: openGroupV2?.let {
|
||||
it.server
|
||||
} ?: FileServerAPI.shared.server
|
||||
val shouldEncrypt = (openGroup == null && openGroupV2 == null) // Encrypt if this isn't an open group
|
||||
|
||||
val attachmentKey = Util.getSecretBytes(64)
|
||||
val paddedLength = if (usePadding) PaddingInputStream.getPaddedSize(attachment.length) else attachment.length
|
||||
@@ -58,7 +64,11 @@ class AttachmentUploadJob(val attachmentID: Long, val threadID: String, val mess
|
||||
val outputStreamFactory = if (shouldEncrypt) AttachmentCipherOutputStreamFactory(attachmentKey) else PlaintextOutputStreamFactory()
|
||||
val attachmentData = PushAttachmentData(attachment.contentType, dataStream, ciphertextLength, outputStreamFactory, attachment.listener)
|
||||
|
||||
val uploadResult = FileServerAPI.shared.uploadAttachment(server, attachmentData)
|
||||
val uploadResult = if (openGroupV2 == null) FileServerAPI.shared.uploadAttachment(server, attachmentData) else {
|
||||
val dataBytes = attachmentData.data.readBytes()
|
||||
val result = OpenGroupAPIV2.upload(dataBytes, openGroupV2.room, openGroupV2.server).get()
|
||||
DotNetAPI.UploadResult(result, "${openGroupV2.server}/files/$result", byteArrayOf())
|
||||
}
|
||||
handleSuccess(attachment, attachmentKey, uploadResult)
|
||||
} catch (e: java.lang.Exception) {
|
||||
if (e == Error.NoAttachment) {
|
||||
|
@@ -18,6 +18,7 @@ class JobQueue : JobDelegate {
|
||||
private var hasResumedPendingJobs = false // Just for debugging
|
||||
private val jobTimestampMap = ConcurrentHashMap<Long, AtomicInteger>()
|
||||
private val dispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
|
||||
private val multiDispatcher = Executors.newFixedThreadPool(2).asCoroutineDispatcher()
|
||||
private val scope = GlobalScope + SupervisorJob()
|
||||
private val queue = Channel<Job>(UNLIMITED)
|
||||
|
||||
@@ -28,8 +29,15 @@ class JobQueue : JobDelegate {
|
||||
scope.launch(dispatcher) {
|
||||
while (isActive) {
|
||||
queue.receive().let { job ->
|
||||
job.delegate = this@JobQueue
|
||||
job.execute()
|
||||
if (job.canExecuteParallel()) {
|
||||
launch(multiDispatcher) {
|
||||
job.delegate = this@JobQueue
|
||||
job.execute()
|
||||
}
|
||||
} else {
|
||||
job.delegate = this@JobQueue
|
||||
job.execute()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -40,6 +48,13 @@ class JobQueue : JobDelegate {
|
||||
val shared: JobQueue by lazy { JobQueue() }
|
||||
}
|
||||
|
||||
private fun Job.canExecuteParallel(): Boolean {
|
||||
return this.javaClass in arrayOf(
|
||||
AttachmentUploadJob::class.java,
|
||||
AttachmentDownloadJob::class.java
|
||||
)
|
||||
}
|
||||
|
||||
fun add(job: Job) {
|
||||
addWithoutExecuting(job)
|
||||
queue.offer(job) // offer always called on unlimited capacity
|
||||
|
@@ -1,10 +1,15 @@
|
||||
package org.session.libsession.messaging.messages
|
||||
|
||||
import org.session.libsession.messaging.MessagingModuleConfiguration
|
||||
import org.session.libsession.messaging.open_groups.OpenGroupV2
|
||||
import org.session.libsession.messaging.open_groups.OpenGroup
|
||||
import org.session.libsession.messaging.threads.Address
|
||||
import org.session.libsession.utilities.GroupUtil
|
||||
import org.session.libsignal.service.loki.utilities.toHexString
|
||||
|
||||
typealias OpenGroupModel = OpenGroup
|
||||
typealias OpenGroupV2Model = OpenGroupV2
|
||||
|
||||
sealed class Destination {
|
||||
|
||||
class Contact(var publicKey: String) : Destination() {
|
||||
@@ -16,6 +21,9 @@ sealed class Destination {
|
||||
class OpenGroup(var channel: Long, var server: String) : Destination() {
|
||||
internal constructor(): this(0, "")
|
||||
}
|
||||
class OpenGroupV2(var room: String, var server: String): Destination() {
|
||||
internal constructor(): this("", "")
|
||||
}
|
||||
|
||||
companion object {
|
||||
fun from(address: Address): Destination {
|
||||
@@ -29,9 +37,13 @@ sealed class Destination {
|
||||
ClosedGroup(groupPublicKey)
|
||||
}
|
||||
address.isOpenGroup -> {
|
||||
val threadID = MessagingModuleConfiguration.shared.storage.getThreadID(address.contactIdentifier())!!
|
||||
val openGroup = MessagingModuleConfiguration.shared.storage.getOpenGroup(threadID)!!
|
||||
OpenGroup(openGroup.channel, openGroup.server)
|
||||
val storage = MessagingModuleConfiguration.shared.storage
|
||||
val threadID = storage.getThreadID(address.contactIdentifier())!!
|
||||
when (val openGroup = storage.getOpenGroup(threadID) ?: storage.getV2OpenGroup(threadID)) {
|
||||
is OpenGroupModel -> OpenGroup(openGroup.channel, openGroup.server)
|
||||
is OpenGroupV2Model -> OpenGroupV2(openGroup.room, openGroup.server)
|
||||
else -> throw Exception("Invalid OpenGroup $openGroup")
|
||||
}
|
||||
}
|
||||
else -> {
|
||||
throw Exception("TODO: Handle legacy closed groups.")
|
||||
|
@@ -113,8 +113,11 @@ class ConfigurationMessage(var closedGroups: List<ClosedGroup>, var openGroups:
|
||||
}
|
||||
if (groupRecord.isOpenGroup) {
|
||||
val threadID = storage.getThreadID(groupRecord.encodedId) ?: continue
|
||||
val openGroup = storage.getOpenGroup(threadID) ?: continue
|
||||
openGroups.add(openGroup.server)
|
||||
val openGroup = storage.getOpenGroup(threadID)
|
||||
val openGroupV2 = storage.getV2OpenGroup(threadID)
|
||||
|
||||
val shareUrl = openGroup?.server ?: openGroupV2?.toJoinUrl() ?: continue
|
||||
openGroups.add(shareUrl)
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -0,0 +1,506 @@
|
||||
package org.session.libsession.messaging.open_groups
|
||||
|
||||
import com.fasterxml.jackson.databind.PropertyNamingStrategy
|
||||
import com.fasterxml.jackson.databind.annotation.JsonNaming
|
||||
import com.fasterxml.jackson.databind.type.TypeFactory
|
||||
import kotlinx.coroutines.*
|
||||
import kotlinx.coroutines.flow.MutableSharedFlow
|
||||
import nl.komponents.kovenant.Kovenant
|
||||
import nl.komponents.kovenant.Promise
|
||||
import nl.komponents.kovenant.functional.bind
|
||||
import nl.komponents.kovenant.functional.map
|
||||
import okhttp3.Headers
|
||||
import okhttp3.HttpUrl
|
||||
import okhttp3.MediaType
|
||||
import okhttp3.RequestBody
|
||||
import org.session.libsession.messaging.MessagingModuleConfiguration
|
||||
import org.session.libsession.messaging.open_groups.OpenGroupAPIV2.Error
|
||||
import org.session.libsession.snode.OnionRequestAPI
|
||||
import org.session.libsession.utilities.AESGCM
|
||||
import org.session.libsignal.service.loki.HTTP
|
||||
import org.session.libsignal.service.loki.HTTP.Verb.*
|
||||
import org.session.libsignal.service.loki.utilities.removing05PrefixIfNeeded
|
||||
import org.session.libsignal.service.loki.utilities.toHexString
|
||||
import org.session.libsignal.utilities.Base64.*
|
||||
import org.session.libsignal.utilities.Hex
|
||||
import org.session.libsignal.utilities.JsonUtil
|
||||
import org.session.libsignal.utilities.logging.Log
|
||||
import org.whispersystems.curve25519.Curve25519
|
||||
import java.util.*
|
||||
|
||||
object OpenGroupAPIV2 {
|
||||
|
||||
private val moderators: HashMap<String, Set<String>> = hashMapOf() // Server URL to (channel ID to set of moderator IDs)
|
||||
const val DEFAULT_SERVER = "http://116.203.70.33"
|
||||
private const val DEFAULT_SERVER_PUBLIC_KEY = "a03c383cf63c3c4efe67acc52112a6dd734b3a946b9545f488aaa93da7991238"
|
||||
|
||||
val defaultRooms = MutableSharedFlow<List<DefaultGroup>>(replay = 1)
|
||||
|
||||
private val curve = Curve25519.getInstance(Curve25519.BEST)
|
||||
|
||||
sealed class Error : Exception() {
|
||||
object GENERIC : Error()
|
||||
object PARSING_FAILED : Error()
|
||||
object DECRYPTION_FAILED : Error()
|
||||
object SIGNING_FAILED : Error()
|
||||
object INVALID_URL : Error()
|
||||
object NO_PUBLIC_KEY : Error()
|
||||
|
||||
fun errorDescription() = when (this) {
|
||||
Error.GENERIC -> "An error occurred."
|
||||
Error.PARSING_FAILED -> "Invalid response."
|
||||
Error.DECRYPTION_FAILED -> "Couldn't decrypt response."
|
||||
Error.SIGNING_FAILED -> "Couldn't sign message."
|
||||
Error.INVALID_URL -> "Invalid URL."
|
||||
Error.NO_PUBLIC_KEY -> "Couldn't find server public key."
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
data class DefaultGroup(val id: String,
|
||||
val name: String,
|
||||
val image: ByteArray?) {
|
||||
fun toJoinUrl(): String = "$DEFAULT_SERVER/$id?public_key=$DEFAULT_SERVER_PUBLIC_KEY"
|
||||
}
|
||||
|
||||
data class Info(
|
||||
val id: String,
|
||||
val name: String,
|
||||
val imageID: String?
|
||||
)
|
||||
|
||||
@JsonNaming(PropertyNamingStrategy.SnakeCaseStrategy::class)
|
||||
data class CompactPollRequest(val roomId: String,
|
||||
val authToken: String,
|
||||
val fromDeletionServerId: Long?,
|
||||
val fromMessageServerId: Long?
|
||||
)
|
||||
|
||||
data class CompactPollResult(val messages: List<OpenGroupMessageV2>,
|
||||
val deletions: List<Long>,
|
||||
val moderators: List<String>
|
||||
)
|
||||
|
||||
@JsonNaming(PropertyNamingStrategy.SnakeCaseStrategy::class)
|
||||
data class MessageDeletion @JvmOverloads constructor(val id: Long = 0,
|
||||
val deletedMessageId: Long = 0
|
||||
) {
|
||||
companion object {
|
||||
val EMPTY = MessageDeletion()
|
||||
}
|
||||
}
|
||||
|
||||
data class Request(
|
||||
val verb: HTTP.Verb,
|
||||
val room: String?,
|
||||
val server: String,
|
||||
val endpoint: String,
|
||||
val queryParameters: Map<String, String> = mapOf(),
|
||||
val parameters: Any? = null,
|
||||
val headers: Map<String, String> = mapOf(),
|
||||
val isAuthRequired: Boolean = true,
|
||||
// Always `true` under normal circumstances. You might want to disable
|
||||
// this when running over Lokinet.
|
||||
val useOnionRouting: Boolean = true
|
||||
)
|
||||
|
||||
private fun createBody(parameters: Any?): RequestBody? {
|
||||
if (parameters == null) return null
|
||||
|
||||
val parametersAsJSON = JsonUtil.toJson(parameters)
|
||||
return RequestBody.create(MediaType.get("application/json"), parametersAsJSON)
|
||||
}
|
||||
|
||||
private fun send(request: Request, isJsonRequired: Boolean = true): Promise<Map<*, *>, Exception> {
|
||||
val parsed = HttpUrl.parse(request.server) ?: return Promise.ofFail(Error.INVALID_URL)
|
||||
val urlBuilder = HttpUrl.Builder()
|
||||
.scheme(parsed.scheme())
|
||||
.host(parsed.host())
|
||||
.port(parsed.port())
|
||||
.addPathSegments(request.endpoint)
|
||||
|
||||
if (request.verb == GET) {
|
||||
for ((key, value) in request.queryParameters) {
|
||||
urlBuilder.addQueryParameter(key, value)
|
||||
}
|
||||
}
|
||||
|
||||
fun execute(token: String?): Promise<Map<*, *>, Exception> {
|
||||
val requestBuilder = okhttp3.Request.Builder()
|
||||
.url(urlBuilder.build())
|
||||
.headers(Headers.of(request.headers))
|
||||
if (request.isAuthRequired) {
|
||||
if (token.isNullOrEmpty()) throw IllegalStateException("No auth token for request")
|
||||
requestBuilder.header("Authorization", token)
|
||||
}
|
||||
when (request.verb) {
|
||||
GET -> requestBuilder.get()
|
||||
PUT -> requestBuilder.put(createBody(request.parameters)!!)
|
||||
POST -> requestBuilder.post(createBody(request.parameters)!!)
|
||||
DELETE -> requestBuilder.delete(createBody(request.parameters))
|
||||
}
|
||||
|
||||
if (!request.room.isNullOrEmpty()) {
|
||||
requestBuilder.header("Room", request.room)
|
||||
}
|
||||
|
||||
if (request.useOnionRouting) {
|
||||
val publicKey = MessagingModuleConfiguration.shared.storage.getOpenGroupPublicKey(request.server)
|
||||
?: return Promise.ofFail(Error.NO_PUBLIC_KEY)
|
||||
return OnionRequestAPI.sendOnionRequest(requestBuilder.build(), request.server, publicKey, isJSONRequired = isJsonRequired)
|
||||
.fail { e ->
|
||||
if (e is OnionRequestAPI.HTTPRequestFailedAtDestinationException && e.statusCode == 401) {
|
||||
val storage = MessagingModuleConfiguration.shared.storage
|
||||
if (request.room != null) {
|
||||
storage.removeAuthToken("${request.server}.${request.room}")
|
||||
} else {
|
||||
storage.removeAuthToken(request.server)
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return Promise.ofFail(IllegalStateException("It's currently not allowed to send non onion routed requests."))
|
||||
}
|
||||
}
|
||||
return if (request.isAuthRequired) {
|
||||
getAuthToken(request.room!!, request.server).bind { execute(it) }
|
||||
} else {
|
||||
execute(null)
|
||||
}
|
||||
}
|
||||
|
||||
fun downloadOpenGroupProfilePicture(roomID: String, server: String): Promise<ByteArray, Exception> {
|
||||
val request = Request(verb = GET, room = roomID, server = server, endpoint = "rooms/$roomID/image", isAuthRequired = false)
|
||||
return send(request).map { json ->
|
||||
val result = json["result"] as? String ?: throw Error.PARSING_FAILED
|
||||
decode(result)
|
||||
}
|
||||
}
|
||||
|
||||
fun getAuthToken(room: String, server: String): Promise<String, Exception> {
|
||||
val storage = MessagingModuleConfiguration.shared.storage
|
||||
return storage.getAuthToken(room, server)?.let {
|
||||
Promise.of(it)
|
||||
} ?: run {
|
||||
requestNewAuthToken(room, server)
|
||||
.bind { claimAuthToken(it, room, server) }
|
||||
.success { authToken ->
|
||||
storage.setAuthToken(room, server, authToken)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fun requestNewAuthToken(room: String, server: String): Promise<String, Exception> {
|
||||
val (publicKey, privateKey) = MessagingModuleConfiguration.shared.storage.getUserKeyPair()
|
||||
?: return Promise.ofFail(Error.GENERIC)
|
||||
val queryParameters = mutableMapOf("public_key" to publicKey)
|
||||
val request = Request(GET, room, server, "auth_token_challenge", queryParameters, isAuthRequired = false, parameters = null)
|
||||
return send(request).map { json ->
|
||||
val challenge = json["challenge"] as? Map<*, *> ?: throw Error.PARSING_FAILED
|
||||
val base64EncodedCiphertext = challenge["ciphertext"] as? String
|
||||
?: throw Error.PARSING_FAILED
|
||||
val base64EncodedEphemeralPublicKey = challenge["ephemeral_public_key"] as? String
|
||||
?: throw Error.PARSING_FAILED
|
||||
val ciphertext = decode(base64EncodedCiphertext)
|
||||
val ephemeralPublicKey = decode(base64EncodedEphemeralPublicKey)
|
||||
val symmetricKey = AESGCM.generateSymmetricKey(ephemeralPublicKey, privateKey)
|
||||
val tokenAsData = try {
|
||||
AESGCM.decrypt(ciphertext, symmetricKey)
|
||||
} catch (e: Exception) {
|
||||
throw Error.DECRYPTION_FAILED
|
||||
}
|
||||
tokenAsData.toHexString()
|
||||
}
|
||||
}
|
||||
|
||||
fun claimAuthToken(authToken: String, room: String, server: String): Promise<String, Exception> {
|
||||
val parameters = mapOf("public_key" to MessagingModuleConfiguration.shared.storage.getUserPublicKey()!!)
|
||||
val headers = mapOf("Authorization" to authToken)
|
||||
val request = Request(verb = POST, room = room, server = server, endpoint = "claim_auth_token",
|
||||
parameters = parameters, headers = headers, isAuthRequired = false)
|
||||
return send(request).map { authToken }
|
||||
}
|
||||
|
||||
fun deleteAuthToken(room: String, server: String): Promise<Unit, Exception> {
|
||||
val request = Request(verb = DELETE, room = room, server = server, endpoint = "auth_token")
|
||||
return send(request).map {
|
||||
MessagingModuleConfiguration.shared.storage.removeAuthToken(room, server)
|
||||
}
|
||||
}
|
||||
|
||||
// region Sending
|
||||
fun upload(file: ByteArray, room: String, server: String): Promise<Long, Exception> {
|
||||
val base64EncodedFile = encodeBytes(file)
|
||||
val parameters = mapOf("file" to base64EncodedFile)
|
||||
val request = Request(verb = POST, room = room, server = server, endpoint = "files", parameters = parameters)
|
||||
return send(request).map { json ->
|
||||
json["result"] as? Long ?: throw Error.PARSING_FAILED
|
||||
}
|
||||
}
|
||||
|
||||
fun download(file: Long, room: String, server: String): Promise<ByteArray, Exception> {
|
||||
val request = Request(verb = GET, room = room, server = server, endpoint = "files/$file")
|
||||
return send(request).map { json ->
|
||||
val base64EncodedFile = json["result"] as? String ?: throw Error.PARSING_FAILED
|
||||
decode(base64EncodedFile) ?: throw Error.PARSING_FAILED
|
||||
}
|
||||
}
|
||||
|
||||
fun send(message: OpenGroupMessageV2, room: String, server: String): Promise<OpenGroupMessageV2, Exception> {
|
||||
val signedMessage = message.sign() ?: return Promise.ofFail(Error.SIGNING_FAILED)
|
||||
val jsonMessage = signedMessage.toJSON()
|
||||
val request = Request(verb = POST, room = room, server = server, endpoint = "messages", parameters = jsonMessage)
|
||||
return send(request).map { json ->
|
||||
@Suppress("UNCHECKED_CAST") val rawMessage = json["message"] as? Map<String, Any>
|
||||
?: throw Error.PARSING_FAILED
|
||||
OpenGroupMessageV2.fromJSON(rawMessage) ?: throw Error.PARSING_FAILED
|
||||
}
|
||||
}
|
||||
// endregion
|
||||
|
||||
// region Messages
|
||||
fun getMessages(room: String, server: String): Promise<List<OpenGroupMessageV2>, Exception> {
|
||||
val storage = MessagingModuleConfiguration.shared.storage
|
||||
val queryParameters = mutableMapOf<String, String>()
|
||||
storage.getLastMessageServerId(room, server)?.let { lastId ->
|
||||
queryParameters += "from_server_id" to lastId.toString()
|
||||
}
|
||||
val request = Request(verb = GET, room = room, server = server, endpoint = "messages", queryParameters = queryParameters)
|
||||
return send(request).map { jsonList ->
|
||||
@Suppress("UNCHECKED_CAST") val rawMessages = jsonList["messages"] as? List<Map<String, Any>>
|
||||
?: throw Error.PARSING_FAILED
|
||||
val lastMessageServerId = storage.getLastMessageServerId(room, server) ?: 0
|
||||
|
||||
var currentMax = lastMessageServerId
|
||||
val messages = rawMessages.mapNotNull { json ->
|
||||
try {
|
||||
val message = OpenGroupMessageV2.fromJSON(json) ?: return@mapNotNull null
|
||||
if (message.serverID == null || message.sender.isNullOrEmpty()) return@mapNotNull null
|
||||
val sender = message.sender
|
||||
val data = decode(message.base64EncodedData)
|
||||
val signature = decode(message.base64EncodedSignature)
|
||||
val publicKey = Hex.fromStringCondensed(sender.removing05PrefixIfNeeded())
|
||||
val isValid = curve.verifySignature(publicKey, data, signature)
|
||||
if (!isValid) {
|
||||
Log.d("Loki", "Ignoring message with invalid signature")
|
||||
return@mapNotNull null
|
||||
}
|
||||
if (message.serverID > lastMessageServerId) {
|
||||
currentMax = message.serverID
|
||||
}
|
||||
message
|
||||
} catch (e: Exception) {
|
||||
null
|
||||
}
|
||||
}
|
||||
storage.setLastMessageServerId(room, server, currentMax)
|
||||
messages
|
||||
}
|
||||
}
|
||||
// endregion
|
||||
|
||||
// region Message Deletion
|
||||
@JvmStatic
|
||||
fun deleteMessage(serverID: Long, room: String, server: String): Promise<Unit, Exception> {
|
||||
val request = Request(verb = DELETE, room = room, server = server, endpoint = "messages/$serverID")
|
||||
return send(request).map {
|
||||
Log.d("Loki", "Deleted server message")
|
||||
}
|
||||
}
|
||||
|
||||
fun getDeletedMessages(room: String, server: String): Promise<List<MessageDeletion>, Exception> {
|
||||
val storage = MessagingModuleConfiguration.shared.storage
|
||||
val queryParameters = mutableMapOf<String, String>()
|
||||
storage.getLastDeletionServerId(room, server)?.let { last ->
|
||||
queryParameters["from_server_id"] = last.toString()
|
||||
}
|
||||
val request = Request(verb = GET, room = room, server = server, endpoint = "deleted_messages", queryParameters = queryParameters)
|
||||
return send(request).map { json ->
|
||||
val type = TypeFactory.defaultInstance().constructCollectionType(List::class.java, MessageDeletion::class.java)
|
||||
val idsAsString = JsonUtil.toJson(json["ids"])
|
||||
val serverIDs = JsonUtil.fromJson<List<MessageDeletion>>(idsAsString, type) ?: throw Error.PARSING_FAILED
|
||||
val lastMessageServerId = storage.getLastDeletionServerId(room, server) ?: 0
|
||||
val serverID = serverIDs.maxByOrNull {it.id } ?: MessageDeletion.EMPTY
|
||||
if (serverID.id > lastMessageServerId) {
|
||||
storage.setLastDeletionServerId(room, server, serverID.id)
|
||||
}
|
||||
serverIDs
|
||||
}
|
||||
}
|
||||
// endregion
|
||||
|
||||
// region Moderation
|
||||
private fun handleModerators(serverRoomId: String, moderatorList: List<String>) {
|
||||
moderators[serverRoomId] = moderatorList.toMutableSet()
|
||||
}
|
||||
|
||||
fun getModerators(room: String, server: String): Promise<List<String>, Exception> {
|
||||
val request = Request(verb = GET, room = room, server = server, endpoint = "moderators")
|
||||
return send(request).map { json ->
|
||||
@Suppress("UNCHECKED_CAST") val moderatorsJson = json["moderators"] as? List<String>
|
||||
?: throw Error.PARSING_FAILED
|
||||
val id = "$server.$room"
|
||||
handleModerators(id, moderatorsJson)
|
||||
moderatorsJson
|
||||
}
|
||||
}
|
||||
|
||||
@JvmStatic
|
||||
fun ban(publicKey: String, room: String, server: String): Promise<Unit, Exception> {
|
||||
val parameters = mapOf("public_key" to publicKey)
|
||||
val request = Request(verb = POST, room = room, server = server, endpoint = "block_list", parameters = parameters)
|
||||
return send(request).map {
|
||||
Log.d("Loki", "Banned user $publicKey from $server.$room")
|
||||
}
|
||||
}
|
||||
|
||||
fun unban(publicKey: String, room: String, server: String): Promise<Unit, Exception> {
|
||||
val request = Request(verb = DELETE, room = room, server = server, endpoint = "block_list/$publicKey")
|
||||
return send(request).map {
|
||||
Log.d("Loki", "Unbanned user $publicKey from $server.$room")
|
||||
}
|
||||
}
|
||||
|
||||
@JvmStatic
|
||||
fun isUserModerator(publicKey: String, room: String, server: String): Boolean =
|
||||
moderators["$server.$room"]?.contains(publicKey) ?: false
|
||||
// endregion
|
||||
|
||||
// region General
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
fun getCompactPoll(rooms: List<String>, server: String): Promise<Map<String, CompactPollResult>, Exception> {
|
||||
val requestAuths = rooms.associateWith { room -> getAuthToken(room, server) }
|
||||
val storage = MessagingModuleConfiguration.shared.storage
|
||||
val requests = rooms.mapNotNull { room ->
|
||||
val authToken = try {
|
||||
requestAuths[room]?.get()
|
||||
} catch (e: Exception) {
|
||||
Log.e("Loki", "Failed to get auth token for $room", e)
|
||||
null
|
||||
} ?: return@mapNotNull null
|
||||
|
||||
CompactPollRequest(roomId = room,
|
||||
authToken = authToken,
|
||||
fromDeletionServerId = storage.getLastDeletionServerId(room, server),
|
||||
fromMessageServerId = storage.getLastMessageServerId(room, server)
|
||||
)
|
||||
}
|
||||
val request = Request(verb = POST, room = null, server = server, endpoint = "compact_poll", isAuthRequired = false, parameters = mapOf("requests" to requests))
|
||||
// build a request for all rooms
|
||||
return send(request = request).map { json ->
|
||||
val results = json["results"] as? List<*> ?: throw Error.PARSING_FAILED
|
||||
|
||||
results.mapNotNull { roomJson ->
|
||||
if (roomJson !is Map<*,*>) return@mapNotNull null
|
||||
val roomId = roomJson["room_id"] as? String ?: return@mapNotNull null
|
||||
|
||||
// check the status was fine
|
||||
val statusCode = roomJson["status_code"] as? Int ?: return@mapNotNull null
|
||||
if (statusCode == 401) {
|
||||
// delete auth token and return null
|
||||
storage.removeAuthToken(roomId, server)
|
||||
}
|
||||
|
||||
// check and store mods
|
||||
val moderators = roomJson["moderators"] as? List<String> ?: return@mapNotNull null
|
||||
handleModerators("$server.$roomId", moderators)
|
||||
|
||||
// get deletions
|
||||
val type = TypeFactory.defaultInstance().constructCollectionType(List::class.java, MessageDeletion::class.java)
|
||||
val idsAsString = JsonUtil.toJson(roomJson["deletions"])
|
||||
val deletedServerIDs = JsonUtil.fromJson<List<MessageDeletion>>(idsAsString, type) ?: throw Error.PARSING_FAILED
|
||||
val lastDeletionServerId = storage.getLastDeletionServerId(roomId, server) ?: 0
|
||||
val serverID = deletedServerIDs.maxByOrNull {it.id } ?: MessageDeletion.EMPTY
|
||||
if (serverID.id > lastDeletionServerId) {
|
||||
storage.setLastDeletionServerId(roomId, server, serverID.id)
|
||||
}
|
||||
|
||||
// get messages
|
||||
val rawMessages = roomJson["messages"] as? List<Map<String, Any>> ?: return@mapNotNull null // parsing failed
|
||||
|
||||
val lastMessageServerId = storage.getLastMessageServerId(roomId, server) ?: 0
|
||||
var currentMax = lastMessageServerId
|
||||
val messages = rawMessages.mapNotNull { rawMessage ->
|
||||
val message = OpenGroupMessageV2.fromJSON(rawMessage)?.apply {
|
||||
currentMax = maxOf(currentMax,this.serverID ?: 0)
|
||||
}
|
||||
message
|
||||
}
|
||||
storage.setLastMessageServerId(roomId, server, currentMax)
|
||||
roomId to CompactPollResult(
|
||||
messages = messages,
|
||||
deletions = deletedServerIDs.map { it.deletedMessageId },
|
||||
moderators = moderators
|
||||
)
|
||||
}.toMap()
|
||||
}
|
||||
}
|
||||
|
||||
fun getDefaultRoomsIfNeeded(): Promise<List<DefaultGroup>, Exception> {
|
||||
val storage = MessagingModuleConfiguration.shared.storage
|
||||
storage.setOpenGroupPublicKey(DEFAULT_SERVER, DEFAULT_SERVER_PUBLIC_KEY)
|
||||
return getAllRooms(DEFAULT_SERVER).map { groups ->
|
||||
val earlyGroups = groups.map { group ->
|
||||
DefaultGroup(group.id, group.name, null)
|
||||
}
|
||||
// see if we have any cached rooms, and if they already have images, don't overwrite with early non-image results
|
||||
defaultRooms.replayCache.firstOrNull()?.let { replayed ->
|
||||
if (replayed.none { it.image?.isNotEmpty() == true}) {
|
||||
defaultRooms.tryEmit(earlyGroups)
|
||||
}
|
||||
}
|
||||
val images = groups.map { group ->
|
||||
group.id to downloadOpenGroupProfilePicture(group.id, DEFAULT_SERVER)
|
||||
}.toMap()
|
||||
|
||||
groups.map { group ->
|
||||
val image = try {
|
||||
images[group.id]!!.get()
|
||||
} catch (e: Exception) {
|
||||
// no image or image failed to download
|
||||
null
|
||||
}
|
||||
DefaultGroup(group.id, group.name, image)
|
||||
}
|
||||
}.success { new ->
|
||||
defaultRooms.tryEmit(new)
|
||||
}
|
||||
}
|
||||
|
||||
fun getInfo(room: String, server: String): Promise<Info, Exception> {
|
||||
val request = Request(verb = GET, room = null, server = server, endpoint = "rooms/$room", isAuthRequired = false)
|
||||
return send(request).map { json ->
|
||||
val rawRoom = json["room"] as? Map<*, *> ?: throw Error.PARSING_FAILED
|
||||
val id = rawRoom["id"] as? String ?: throw Error.PARSING_FAILED
|
||||
val name = rawRoom["name"] as? String ?: throw Error.PARSING_FAILED
|
||||
val imageID = rawRoom["image_id"] as? String
|
||||
Info(id = id, name = name, imageID = imageID)
|
||||
}
|
||||
}
|
||||
|
||||
fun getAllRooms(server: String): Promise<List<Info>, Exception> {
|
||||
val request = Request(verb = GET, room = null, server = server, endpoint = "rooms", isAuthRequired = false)
|
||||
return send(request).map { json ->
|
||||
val rawRooms = json["rooms"] as? List<Map<*, *>> ?: throw Error.PARSING_FAILED
|
||||
rawRooms.mapNotNull {
|
||||
val roomJson = it as? Map<*, *> ?: return@mapNotNull null
|
||||
val id = roomJson["id"] as? String ?: return@mapNotNull null
|
||||
val name = roomJson["name"] as? String ?: return@mapNotNull null
|
||||
val imageId = roomJson["image_id"] as? String
|
||||
Info(id, name, imageId)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fun getMemberCount(room: String, server: String): Promise<Int, Exception> {
|
||||
val request = Request(verb = GET, room = room, server = server, endpoint = "member_count")
|
||||
return send(request).map { json ->
|
||||
val memberCount = json["member_count"] as? Int ?: throw Error.PARSING_FAILED
|
||||
val storage = MessagingModuleConfiguration.shared.storage
|
||||
storage.setUserCount(room, server, memberCount)
|
||||
memberCount
|
||||
}
|
||||
}
|
||||
// endregion
|
||||
|
||||
}
|
@@ -0,0 +1,69 @@
|
||||
package org.session.libsession.messaging.open_groups
|
||||
|
||||
import org.session.libsession.messaging.MessagingModuleConfiguration
|
||||
import org.session.libsignal.service.internal.push.PushTransportDetails
|
||||
import org.session.libsignal.service.internal.push.SignalServiceProtos
|
||||
import org.session.libsignal.utilities.Base64
|
||||
import org.session.libsignal.utilities.Base64.decode
|
||||
import org.session.libsignal.utilities.logging.Log
|
||||
import org.whispersystems.curve25519.Curve25519
|
||||
|
||||
data class OpenGroupMessageV2(
|
||||
val serverID: Long? = null,
|
||||
val sender: String?,
|
||||
val sentTimestamp: Long,
|
||||
// The serialized protobuf in base64 encoding
|
||||
val base64EncodedData: String,
|
||||
// When sending a message, the sender signs the serialized protobuf with their private key so that
|
||||
// a receiving user can verify that the message wasn't tampered with.
|
||||
val base64EncodedSignature: String? = null
|
||||
) {
|
||||
|
||||
companion object {
|
||||
private val curve = Curve25519.getInstance(Curve25519.BEST)
|
||||
|
||||
fun fromJSON(json: Map<String, Any>): OpenGroupMessageV2? {
|
||||
val base64EncodedData = json["data"] as? String ?: return null
|
||||
val sentTimestamp = json["timestamp"] as? Long ?: return null
|
||||
val serverID = json["server_id"] as? Int
|
||||
val sender = json["public_key"] as? String
|
||||
val base64EncodedSignature = json["signature"] as? String
|
||||
return OpenGroupMessageV2(serverID = serverID?.toLong(),
|
||||
sender = sender,
|
||||
sentTimestamp = sentTimestamp,
|
||||
base64EncodedData = base64EncodedData,
|
||||
base64EncodedSignature = base64EncodedSignature
|
||||
)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
fun sign(): OpenGroupMessageV2? {
|
||||
if (base64EncodedData.isEmpty()) return null
|
||||
val (publicKey, privateKey) = MessagingModuleConfiguration.shared.storage.getUserKeyPair() ?: return null
|
||||
|
||||
if (sender != publicKey) return null // only sign our own messages?
|
||||
|
||||
val signature = try {
|
||||
curve.calculateSignature(privateKey, decode(base64EncodedData))
|
||||
} catch (e: Exception) {
|
||||
Log.e("Loki", "Couldn't sign OpenGroupV2Message", e)
|
||||
return null
|
||||
}
|
||||
|
||||
return copy(base64EncodedSignature = Base64.encodeBytes(signature))
|
||||
}
|
||||
|
||||
fun toJSON(): Map<String, Any> {
|
||||
val jsonMap = mutableMapOf("data" to base64EncodedData, "timestamp" to sentTimestamp)
|
||||
serverID?.let { jsonMap["server_id"] = serverID }
|
||||
sender?.let { jsonMap["public_key"] = sender }
|
||||
base64EncodedSignature?.let { jsonMap["signature"] = base64EncodedSignature }
|
||||
return jsonMap
|
||||
}
|
||||
|
||||
fun toProto(): SignalServiceProtos.Content = decode(base64EncodedData).let(PushTransportDetails::getStrippedPaddingMessageBody).let { bytes ->
|
||||
SignalServiceProtos.Content.parseFrom(bytes)
|
||||
}
|
||||
|
||||
}
|
@@ -0,0 +1,51 @@
|
||||
package org.session.libsession.messaging.open_groups
|
||||
|
||||
import org.session.libsignal.utilities.JsonUtil
|
||||
import java.util.*
|
||||
|
||||
data class OpenGroupV2(
|
||||
val server: String,
|
||||
val room: String,
|
||||
val id: String,
|
||||
val name: String,
|
||||
val publicKey: String
|
||||
) {
|
||||
|
||||
constructor(server: String, room: String, name: String, publicKey: String) : this(
|
||||
server = server,
|
||||
room = room,
|
||||
id = "$server.$room",
|
||||
name = name,
|
||||
publicKey = publicKey,
|
||||
)
|
||||
|
||||
companion object {
|
||||
|
||||
fun fromJson(jsonAsString: String): OpenGroupV2? {
|
||||
return try {
|
||||
val json = JsonUtil.fromJson(jsonAsString)
|
||||
if (!json.has("room")) return null
|
||||
|
||||
val room = json.get("room").asText().toLowerCase(Locale.getDefault())
|
||||
val server = json.get("server").asText().toLowerCase(Locale.getDefault())
|
||||
val displayName = json.get("displayName").asText()
|
||||
val publicKey = json.get("publicKey").asText()
|
||||
|
||||
OpenGroupV2(server, room, displayName, publicKey)
|
||||
} catch (e: Exception) {
|
||||
null
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
fun toJoinUrl(): String = "$server/$room?public_key=$publicKey"
|
||||
|
||||
fun toJson(): Map<String,String> = mapOf(
|
||||
"room" to room,
|
||||
"server" to server,
|
||||
"displayName" to name,
|
||||
"publicKey" to publicKey,
|
||||
)
|
||||
|
||||
}
|
@@ -7,7 +7,6 @@ import org.session.libsession.messaging.messages.visible.VisibleMessage
|
||||
import org.session.libsession.utilities.GroupUtil
|
||||
import org.session.libsignal.service.internal.push.PushTransportDetails
|
||||
import org.session.libsignal.service.internal.push.SignalServiceProtos
|
||||
import org.session.libsignal.utilities.logging.Log
|
||||
|
||||
object MessageReceiver {
|
||||
|
||||
@@ -37,6 +36,7 @@ object MessageReceiver {
|
||||
is UnknownEnvelopeType -> false
|
||||
is InvalidSignature -> false
|
||||
is NoData -> false
|
||||
is NoThread -> false
|
||||
is SenderBlocked -> false
|
||||
is SelfSend -> false
|
||||
else -> true
|
||||
@@ -122,7 +122,6 @@ object MessageReceiver {
|
||||
message.recipient = userPublicKey
|
||||
message.sentTimestamp = envelope.timestamp
|
||||
message.receivedTimestamp = if (envelope.hasServerTimestamp()) envelope.serverTimestamp else System.currentTimeMillis()
|
||||
Log.d("Loki", "time: ${envelope.timestamp}, sent: ${envelope.serverTimestamp}")
|
||||
message.groupPublicKey = groupPublicKey
|
||||
message.openGroupServerMessageID = openGroupServerID
|
||||
// Validate
|
||||
|
@@ -12,14 +12,15 @@ import org.session.libsession.messaging.messages.control.ClosedGroupControlMessa
|
||||
import org.session.libsession.messaging.messages.control.ConfigurationMessage
|
||||
import org.session.libsession.messaging.messages.control.ExpirationTimerUpdate
|
||||
import org.session.libsession.messaging.messages.visible.*
|
||||
import org.session.libsession.messaging.open_groups.OpenGroupAPI
|
||||
import org.session.libsession.messaging.open_groups.OpenGroupMessage
|
||||
import org.session.libsession.messaging.open_groups.*
|
||||
import org.session.libsession.messaging.threads.Address
|
||||
import org.session.libsession.messaging.threads.recipients.Recipient
|
||||
import org.session.libsession.messaging.utilities.MessageWrapper
|
||||
import org.session.libsession.snode.RawResponsePromise
|
||||
import org.session.libsession.snode.SnodeAPI
|
||||
import org.session.libsession.snode.SnodeModule
|
||||
import org.session.libsession.snode.SnodeMessage
|
||||
import org.session.libsession.utilities.GroupUtil
|
||||
import org.session.libsession.utilities.SSKEnvironment
|
||||
import org.session.libsignal.service.internal.push.PushTransportDetails
|
||||
import org.session.libsignal.service.internal.push.SignalServiceProtos
|
||||
@@ -62,7 +63,7 @@ object MessageSender {
|
||||
|
||||
// Convenience
|
||||
fun send(message: Message, destination: Destination): Promise<Unit, Exception> {
|
||||
if (destination is Destination.OpenGroup) {
|
||||
if (destination is Destination.OpenGroup || destination is Destination.OpenGroupV2) {
|
||||
return sendToOpenGroupDestination(destination, message)
|
||||
}
|
||||
return sendToSnodeDestination(destination, message)
|
||||
@@ -90,7 +91,8 @@ object MessageSender {
|
||||
when (destination) {
|
||||
is Destination.Contact -> message.recipient = destination.publicKey
|
||||
is Destination.ClosedGroup -> message.recipient = destination.groupPublicKey
|
||||
is Destination.OpenGroup -> throw Error.PreconditionFailure("Destination should not be open groups!")
|
||||
is Destination.OpenGroup,
|
||||
is Destination.OpenGroupV2 -> throw Error.PreconditionFailure("Destination should not be open groups!")
|
||||
}
|
||||
// Validate the message
|
||||
if (!message.isValid()) { throw Error.InvalidMessage }
|
||||
@@ -109,9 +111,9 @@ object MessageSender {
|
||||
if (message is VisibleMessage) {
|
||||
val displayName = storage.getUserDisplayName()!!
|
||||
val profileKey = storage.getUserProfileKey()
|
||||
val profilePrictureUrl = storage.getUserProfilePictureURL()
|
||||
if (profileKey != null && profilePrictureUrl != null) {
|
||||
message.profile = Profile(displayName, profileKey, profilePrictureUrl)
|
||||
val profilePictureUrl = storage.getUserProfilePictureURL()
|
||||
if (profileKey != null && profilePictureUrl != null) {
|
||||
message.profile = Profile(displayName, profileKey, profilePictureUrl)
|
||||
} else {
|
||||
message.profile = Profile(displayName)
|
||||
}
|
||||
@@ -128,7 +130,8 @@ object MessageSender {
|
||||
val encryptionKeyPair = MessagingModuleConfiguration.shared.storage.getLatestClosedGroupEncryptionKeyPair(destination.groupPublicKey)!!
|
||||
ciphertext = MessageSenderEncryption.encryptWithSessionProtocol(plaintext, encryptionKeyPair.hexEncodedPublicKey)
|
||||
}
|
||||
is Destination.OpenGroup -> throw Error.PreconditionFailure("Destination should not be open groups!")
|
||||
is Destination.OpenGroup,
|
||||
is Destination.OpenGroupV2 -> throw Error.PreconditionFailure("Destination should not be open groups!")
|
||||
}
|
||||
// Wrap the result
|
||||
val kind: SignalServiceProtos.Envelope.Type
|
||||
@@ -142,7 +145,8 @@ object MessageSender {
|
||||
kind = SignalServiceProtos.Envelope.Type.CLOSED_GROUP_MESSAGE
|
||||
senderPublicKey = destination.groupPublicKey
|
||||
}
|
||||
is Destination.OpenGroup -> throw Error.PreconditionFailure("Destination should not be open groups!")
|
||||
is Destination.OpenGroup,
|
||||
is Destination.OpenGroupV2 -> throw Error.PreconditionFailure("Destination should not be open groups!")
|
||||
}
|
||||
val wrappedMessage = MessageWrapper.wrap(kind, message.sentTimestamp!!, senderPublicKey, ciphertext)
|
||||
// Send the result
|
||||
@@ -150,6 +154,7 @@ object MessageSender {
|
||||
SnodeModule.shared.broadcaster.broadcast("calculatingPoW", message.sentTimestamp!!)
|
||||
}
|
||||
val base64EncodedData = Base64.encodeBytes(wrappedMessage)
|
||||
// Send the result
|
||||
val snodeMessage = SnodeMessage(message.recipient!!, base64EncodedData, message.ttl, message.sentTimestamp!!)
|
||||
if (destination is Destination.Contact && message is VisibleMessage && !isSelfSend) {
|
||||
SnodeModule.shared.broadcaster.broadcast("sendingMessage", message.sentTimestamp!!)
|
||||
@@ -204,32 +209,71 @@ object MessageSender {
|
||||
deferred.reject(error)
|
||||
}
|
||||
try {
|
||||
val server: String
|
||||
val channel: Long
|
||||
when (destination) {
|
||||
is Destination.Contact -> throw Error.PreconditionFailure("Destination should not be contacts!")
|
||||
is Destination.ClosedGroup -> throw Error.PreconditionFailure("Destination should not be closed groups!")
|
||||
is Destination.OpenGroup -> {
|
||||
message.recipient = "${destination.server}.${destination.channel}"
|
||||
server = destination.server
|
||||
channel = destination.channel
|
||||
val server = destination.server
|
||||
val channel = destination.channel
|
||||
|
||||
// Validate the message
|
||||
if (message !is VisibleMessage || !message.isValid()) {
|
||||
throw Error.InvalidMessage
|
||||
}
|
||||
|
||||
// Convert the message to an open group message
|
||||
val openGroupMessage = OpenGroupMessage.from(message, server) ?: run {
|
||||
throw Error.InvalidMessage
|
||||
}
|
||||
// Send the result
|
||||
OpenGroupAPI.sendMessage(openGroupMessage, channel, server).success {
|
||||
message.openGroupServerMessageID = it.serverID
|
||||
handleSuccessfulMessageSend(message, destination)
|
||||
deferred.resolve(Unit)
|
||||
}.fail {
|
||||
handleFailure(it)
|
||||
}
|
||||
}
|
||||
is Destination.OpenGroupV2 -> {
|
||||
message.recipient = "${destination.server}.${destination.room}"
|
||||
val server = destination.server
|
||||
val room = destination.room
|
||||
|
||||
// Attach the user's profile if needed
|
||||
if (message is VisibleMessage) {
|
||||
val displayName = storage.getUserDisplayName()!!
|
||||
val profileKey = storage.getUserProfileKey()
|
||||
val profilePictureUrl = storage.getUserProfilePictureURL()
|
||||
if (profileKey != null && profilePictureUrl != null) {
|
||||
message.profile = Profile(displayName, profileKey, profilePictureUrl)
|
||||
} else {
|
||||
message.profile = Profile(displayName)
|
||||
}
|
||||
}
|
||||
|
||||
// Validate the message
|
||||
if (message !is VisibleMessage || !message.isValid()) {
|
||||
throw Error.InvalidMessage
|
||||
}
|
||||
|
||||
val proto = message.toProto()!!
|
||||
|
||||
val openGroupMessage = OpenGroupMessageV2(
|
||||
sender = message.sender,
|
||||
sentTimestamp = message.sentTimestamp!!,
|
||||
base64EncodedData = Base64.encodeBytes(proto.toByteArray()),
|
||||
)
|
||||
|
||||
OpenGroupAPIV2.send(openGroupMessage,room,server).success {
|
||||
message.openGroupServerMessageID = it.serverID
|
||||
handleSuccessfulMessageSend(message, destination)
|
||||
deferred.resolve(Unit)
|
||||
}.fail {
|
||||
handleFailure(it)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
// Validate the message
|
||||
if (message !is VisibleMessage || !message.isValid()) {
|
||||
throw Error.InvalidMessage
|
||||
}
|
||||
// Convert the message to an open group message
|
||||
val openGroupMessage = OpenGroupMessage.from(message, server) ?: kotlin.run {
|
||||
throw Error.InvalidMessage
|
||||
}
|
||||
// Send the result
|
||||
OpenGroupAPI.sendMessage(openGroupMessage, channel, server).success {
|
||||
message.openGroupServerMessageID = it.serverID
|
||||
handleSuccessfulMessageSend(message, destination)
|
||||
deferred.resolve(Unit)
|
||||
}.fail {
|
||||
handleFailure(it)
|
||||
}
|
||||
} catch (exception: Exception) {
|
||||
handleFailure(exception)
|
||||
@@ -245,8 +289,12 @@ object MessageSender {
|
||||
// Ignore future self-sends
|
||||
storage.addReceivedMessageTimestamp(message.sentTimestamp!!)
|
||||
// Track the open group server message ID
|
||||
if (message.openGroupServerMessageID != null) {
|
||||
storage.setOpenGroupServerMessageID(messageId, message.openGroupServerMessageID!!)
|
||||
if (message.openGroupServerMessageID != null && destination is Destination.OpenGroupV2) {
|
||||
val encoded = GroupUtil.getEncodedOpenGroupID("${destination.server}.${destination.room}".toByteArray())
|
||||
val threadID = storage.getThreadIdFor(Address.fromSerialized(encoded))
|
||||
if (threadID != null && threadID >= 0) {
|
||||
storage.setOpenGroupServerMessageID(messageId, message.openGroupServerMessageID!!, threadID, !(message as VisibleMessage).isMediaMessage())
|
||||
}
|
||||
}
|
||||
// Mark the message as sent
|
||||
storage.markAsSent(message.sentTimestamp!!, message.sender?:userPublicKey)
|
||||
|
@@ -1,6 +1,7 @@
|
||||
package org.session.libsession.messaging.sending_receiving
|
||||
|
||||
import android.text.TextUtils
|
||||
import okhttp3.HttpUrl
|
||||
import org.session.libsession.messaging.MessagingModuleConfiguration
|
||||
import org.session.libsession.messaging.jobs.AttachmentDownloadJob
|
||||
import org.session.libsession.messaging.jobs.JobQueue
|
||||
@@ -128,8 +129,9 @@ private fun handleConfigurationMessage(message: ConfigurationMessage) {
|
||||
handleNewClosedGroup(message.sender!!, message.sentTimestamp!!, closeGroup.publicKey, closeGroup.name, closeGroup.encryptionKeyPair!!, closeGroup.members, closeGroup.admins, message.sentTimestamp!!)
|
||||
}
|
||||
val allOpenGroups = storage.getAllOpenGroups().map { it.value.server }
|
||||
val allV2OpenGroups = storage.getAllV2OpenGroups().map { it.value.toJoinUrl() }
|
||||
for (openGroup in message.openGroups) {
|
||||
if (allOpenGroups.contains(openGroup)) continue
|
||||
if (allOpenGroups.contains(openGroup) || allV2OpenGroups.contains(openGroup)) continue
|
||||
storage.addOpenGroup(openGroup, 1)
|
||||
}
|
||||
if (message.displayName.isNotEmpty()) {
|
||||
@@ -164,16 +166,27 @@ fun MessageReceiver.handleVisibleMessage(message: VisibleMessage, proto: SignalS
|
||||
val storage = MessagingModuleConfiguration.shared.storage
|
||||
val context = MessagingModuleConfiguration.shared.context
|
||||
val userPublicKey = storage.getUserPublicKey()
|
||||
|
||||
// Get or create thread
|
||||
val threadID = storage.getOrCreateThreadIdFor(message.syncTarget
|
||||
?: message.sender!!, message.groupPublicKey, openGroupID)
|
||||
|
||||
val openGroup = threadID.let {
|
||||
storage.getOpenGroup(it.toString())
|
||||
}
|
||||
|
||||
// Update profile if needed
|
||||
val newProfile = message.profile
|
||||
if (newProfile != null && openGroupID.isNullOrEmpty() && userPublicKey != message.sender) {
|
||||
|
||||
if (newProfile != null && userPublicKey != message.sender && openGroup == null) {
|
||||
val profileManager = SSKEnvironment.shared.profileManager
|
||||
val recipient = Recipient.from(context, Address.fromSerialized(message.sender!!), false)
|
||||
val displayName = newProfile.displayName!!
|
||||
if (displayName.isNotEmpty()) {
|
||||
profileManager.setDisplayName(context, recipient, displayName)
|
||||
}
|
||||
if (newProfile.profileKey?.isNotEmpty() == true && !MessageDigest.isEqual(recipient.profileKey, newProfile.profileKey)) {
|
||||
if (newProfile.profileKey?.isNotEmpty() == true
|
||||
&& (recipient.profileKey == null || !MessageDigest.isEqual(recipient.profileKey, newProfile.profileKey))) {
|
||||
profileManager.setProfileKey(context, recipient, newProfile.profileKey!!)
|
||||
profileManager.setUnidentifiedAccessMode(context, recipient, Recipient.UnidentifiedAccessMode.UNKNOWN)
|
||||
val newUrl = newProfile.profilePictureURL
|
||||
@@ -185,9 +198,6 @@ fun MessageReceiver.handleVisibleMessage(message: VisibleMessage, proto: SignalS
|
||||
}
|
||||
}
|
||||
}
|
||||
// Get or create thread
|
||||
val threadID = storage.getOrCreateThreadIdFor(message.syncTarget
|
||||
?: message.sender!!, message.groupPublicKey, openGroupID)
|
||||
// Parse quote if needed
|
||||
var quoteModel: QuoteModel? = null
|
||||
if (message.quote != null && proto.dataMessage.hasQuote()) {
|
||||
@@ -228,7 +238,7 @@ fun MessageReceiver.handleVisibleMessage(message: VisibleMessage, proto: SignalS
|
||||
// Parse stickers if needed
|
||||
// Persist the message
|
||||
message.threadID = threadID
|
||||
val messageID = storage.persist(message, quoteModel, linkPreviews, message.groupPublicKey, openGroupID, attachments) ?: throw MessageReceiver.Error.NoThread
|
||||
val messageID = storage.persist(message, quoteModel, linkPreviews, message.groupPublicKey, openGroupID, attachments) ?: throw MessageReceiver.Error.DuplicateMessage
|
||||
// Parse & persist attachments
|
||||
// Start attachment downloads if needed
|
||||
storage.getAttachmentsForMessage(messageID).forEach { attachment ->
|
||||
@@ -237,6 +247,10 @@ fun MessageReceiver.handleVisibleMessage(message: VisibleMessage, proto: SignalS
|
||||
JobQueue.shared.add(downloadJob)
|
||||
}
|
||||
}
|
||||
val openGroupServerID = message.openGroupServerMessageID
|
||||
if (openGroupServerID != null) {
|
||||
storage.setOpenGroupServerMessageID(messageID, openGroupServerID, threadID, !(message.isMediaMessage() || attachments.isNotEmpty()))
|
||||
}
|
||||
// Cancel any typing indicators if needed
|
||||
cancelTypingIndicatorsIfNeeded(message.sender!!)
|
||||
//Notify the user if needed
|
||||
|
@@ -69,9 +69,6 @@ class ClosedGroupPoller {
|
||||
// ignore inactive group's messages
|
||||
return@successBackground
|
||||
}
|
||||
if (messages.isNotEmpty()) {
|
||||
Log.d("Loki", "Received ${messages.count()} new message(s) in closed group with public key: $publicKey.")
|
||||
}
|
||||
messages.forEach { envelope ->
|
||||
val job = MessageReceiveJob(envelope.toByteArray(), false)
|
||||
JobQueue.shared.add(job)
|
||||
|
@@ -9,6 +9,8 @@ import org.session.libsession.messaging.jobs.MessageReceiveJob
|
||||
import org.session.libsession.messaging.open_groups.OpenGroup
|
||||
import org.session.libsession.messaging.open_groups.OpenGroupAPI
|
||||
import org.session.libsession.messaging.open_groups.OpenGroupMessage
|
||||
import org.session.libsession.messaging.threads.Address
|
||||
import org.session.libsession.utilities.GroupUtil
|
||||
import org.session.libsignal.service.internal.push.SignalServiceProtos.*
|
||||
import org.session.libsignal.utilities.logging.Log
|
||||
import org.session.libsignal.utilities.successBackground
|
||||
@@ -72,7 +74,6 @@ class OpenGroupPoller(private val openGroup: OpenGroup, private val executorServ
|
||||
// Kovenant propagates a context to chained promises, so OpenGroupAPI.sharedContext should be used for all of the below
|
||||
OpenGroupAPI.getMessages(openGroup.channel, openGroup.server).successBackground { messages ->
|
||||
// Process messages in the background
|
||||
Log.d("Loki", "received ${messages.size} messages")
|
||||
messages.forEach { message ->
|
||||
try {
|
||||
val senderPublicKey = message.senderPublicKey
|
||||
@@ -211,10 +212,13 @@ class OpenGroupPoller(private val openGroup: OpenGroup, private val executorServ
|
||||
}
|
||||
|
||||
private fun pollForDeletedMessages() {
|
||||
val messagingModule = MessagingModuleConfiguration.shared
|
||||
val address = GroupUtil.getEncodedOpenGroupID(openGroup.id.toByteArray())
|
||||
val threadId = messagingModule.storage.getThreadIdFor(Address.fromSerialized(address)) ?: return
|
||||
OpenGroupAPI.getDeletedMessageServerIDs(openGroup.channel, openGroup.server).success { deletedMessageServerIDs ->
|
||||
val deletedMessageIDs = deletedMessageServerIDs.mapNotNull { MessagingModuleConfiguration.shared.messageDataProvider.getMessageID(it) }
|
||||
deletedMessageIDs.forEach {
|
||||
MessagingModuleConfiguration.shared.messageDataProvider.deleteMessage(it)
|
||||
val deletedMessageIDs = deletedMessageServerIDs.mapNotNull { messagingModule.messageDataProvider.getMessageID(it, threadId) }
|
||||
deletedMessageIDs.forEach { (messageId, isSms) ->
|
||||
MessagingModuleConfiguration.shared.messageDataProvider.deleteMessage(messageId, isSms)
|
||||
}
|
||||
}.fail {
|
||||
Log.d("Loki", "Failed to get deleted messages for group chat with ID: ${openGroup.channel} on server: ${openGroup.server}.")
|
||||
|
@@ -0,0 +1,130 @@
|
||||
package org.session.libsession.messaging.sending_receiving.pollers
|
||||
|
||||
import nl.komponents.kovenant.Promise
|
||||
import org.session.libsession.messaging.MessagingModuleConfiguration
|
||||
import org.session.libsession.messaging.jobs.JobQueue
|
||||
import org.session.libsession.messaging.jobs.MessageReceiveJob
|
||||
import org.session.libsession.messaging.open_groups.OpenGroupAPIV2
|
||||
import org.session.libsession.messaging.open_groups.OpenGroupMessageV2
|
||||
import org.session.libsession.messaging.open_groups.OpenGroupV2
|
||||
import org.session.libsession.messaging.threads.Address
|
||||
import org.session.libsession.utilities.GroupUtil
|
||||
import org.session.libsignal.service.internal.push.SignalServiceProtos
|
||||
import org.session.libsignal.utilities.logging.Log
|
||||
import org.session.libsignal.utilities.successBackground
|
||||
import java.util.concurrent.ScheduledExecutorService
|
||||
import java.util.concurrent.ScheduledFuture
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
class OpenGroupV2Poller(private val openGroups: List<OpenGroupV2>, private val executorService: ScheduledExecutorService? = null) {
|
||||
|
||||
private var hasStarted = false
|
||||
@Volatile private var isPollOngoing = false
|
||||
var isCaughtUp = false
|
||||
|
||||
private val cancellableFutures = mutableListOf<ScheduledFuture<out Any>>()
|
||||
|
||||
// use this as a receive time-based window to calculate re-poll interval
|
||||
private val receivedQueue = ArrayDeque<Long>(50)
|
||||
|
||||
private fun calculatePollInterval(): Long {
|
||||
// sample last default poll time * 2
|
||||
while (receivedQueue.size > 50) {
|
||||
receivedQueue.removeLast()
|
||||
}
|
||||
val sampleWindow = System.currentTimeMillis() - pollForNewMessagesInterval * 2
|
||||
val numberInSample = receivedQueue.toList().filter { it > sampleWindow }.size.coerceAtLeast(1)
|
||||
return ((2 + (50 / numberInSample / 20)*5) * 1000).toLong()
|
||||
}
|
||||
|
||||
// region Settings
|
||||
companion object {
|
||||
private val pollForNewMessagesInterval: Long = 10 * 1000
|
||||
}
|
||||
// endregion
|
||||
|
||||
// region Lifecycle
|
||||
fun startIfNeeded() {
|
||||
if (hasStarted || executorService == null) return
|
||||
cancellableFutures += executorService.schedule(::compactPoll, 0, TimeUnit.MILLISECONDS)
|
||||
hasStarted = true
|
||||
}
|
||||
|
||||
fun stop() {
|
||||
cancellableFutures.forEach { future ->
|
||||
future.cancel(false)
|
||||
}
|
||||
cancellableFutures.clear()
|
||||
hasStarted = false
|
||||
}
|
||||
// endregion
|
||||
|
||||
// region Polling
|
||||
|
||||
private fun compactPoll(): Promise<Any, Exception> {
|
||||
return compactPoll(false)
|
||||
}
|
||||
|
||||
fun compactPoll(isBackgroundPoll: Boolean): Promise<Any, Exception> {
|
||||
if (isPollOngoing || !hasStarted) return Promise.of(Unit)
|
||||
isPollOngoing = true
|
||||
val server = openGroups.first().server // assume all the same server
|
||||
val rooms = openGroups.map { it.room }
|
||||
return OpenGroupAPIV2.getCompactPoll(rooms = rooms, server).successBackground { results ->
|
||||
results.forEach { (room, results) ->
|
||||
val serverRoomId = "$server.$room"
|
||||
handleDeletedMessages(serverRoomId,results.deletions)
|
||||
handleNewMessages(serverRoomId, results.messages.sortedBy { it.serverID }, isBackgroundPoll)
|
||||
}
|
||||
}.always {
|
||||
isPollOngoing = false
|
||||
if (!isBackgroundPoll) {
|
||||
val delay = calculatePollInterval()
|
||||
executorService?.schedule(this@OpenGroupV2Poller::compactPoll, delay, TimeUnit.MILLISECONDS)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun handleNewMessages(serverRoomId: String, newMessages: List<OpenGroupMessageV2>, isBackgroundPoll: Boolean) {
|
||||
if (!hasStarted) return
|
||||
newMessages.forEach { message ->
|
||||
try {
|
||||
val senderPublicKey = message.sender!!
|
||||
// Main message
|
||||
// Envelope
|
||||
val builder = SignalServiceProtos.Envelope.newBuilder()
|
||||
builder.type = SignalServiceProtos.Envelope.Type.SESSION_MESSAGE
|
||||
builder.source = senderPublicKey
|
||||
builder.sourceDevice = 1
|
||||
builder.content = message.toProto().toByteString()
|
||||
builder.timestamp = message.sentTimestamp
|
||||
val envelope = builder.build()
|
||||
val job = MessageReceiveJob(envelope.toByteArray(), isBackgroundPoll, message.serverID, serverRoomId)
|
||||
Log.d("Loki", "Scheduling Job $job")
|
||||
if (isBackgroundPoll) {
|
||||
job.executeAsync()
|
||||
// The promise is just used to keep track of when we're done
|
||||
} else {
|
||||
JobQueue.shared.add(job)
|
||||
}
|
||||
receivedQueue.addFirst(message.sentTimestamp)
|
||||
} catch (e: Exception) {
|
||||
Log.e("Loki", "Exception parsing message", e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun handleDeletedMessages(serverRoomId: String, deletedMessageServerIDs: List<Long>) {
|
||||
val messagingModule = MessagingModuleConfiguration.shared
|
||||
val address = GroupUtil.getEncodedOpenGroupID(serverRoomId.toByteArray())
|
||||
val threadId = messagingModule.storage.getThreadIdFor(Address.fromSerialized(address)) ?: return
|
||||
|
||||
val deletedMessageIDs = deletedMessageServerIDs.mapNotNull { serverId ->
|
||||
messagingModule.messageDataProvider.getMessageID(serverId, threadId)
|
||||
}
|
||||
deletedMessageIDs.forEach { (messageId, isSms) ->
|
||||
MessagingModuleConfiguration.shared.messageDataProvider.deleteMessage(messageId, isSms)
|
||||
}
|
||||
}
|
||||
// endregion
|
||||
}
|
@@ -10,7 +10,6 @@ import org.session.libsession.messaging.MessagingModuleConfiguration
|
||||
import org.session.libsession.snode.OnionRequestAPI
|
||||
import org.session.libsession.messaging.file_server.FileServerAPI
|
||||
|
||||
import org.session.libsignal.utilities.logging.Log
|
||||
import org.session.libsignal.utilities.DiffieHellman
|
||||
import org.session.libsignal.service.api.crypto.ProfileCipherOutputStream
|
||||
import org.session.libsignal.service.api.messages.SignalServiceAttachment
|
||||
@@ -27,7 +26,7 @@ import org.session.libsignal.service.loki.HTTP
|
||||
import org.session.libsignal.service.loki.utilities.*
|
||||
import org.session.libsignal.utilities.*
|
||||
import org.session.libsignal.utilities.Base64
|
||||
|
||||
import org.session.libsignal.utilities.logging.Log
|
||||
import java.io.File
|
||||
import java.io.FileOutputStream
|
||||
import java.io.OutputStream
|
||||
@@ -179,80 +178,9 @@ open class DotNetAPI {
|
||||
return execute(HTTPVerb.PATCH, server, "users/me", parameters = parameters)
|
||||
}
|
||||
|
||||
// DOWNLOAD
|
||||
|
||||
/**
|
||||
* Blocks the calling thread.
|
||||
*/
|
||||
fun downloadFile(destination: File, url: String, listener: SignalServiceAttachment.ProgressListener?) {
|
||||
val outputStream = FileOutputStream(destination) // Throws
|
||||
var remainingAttempts = 4
|
||||
var exception: Exception? = null
|
||||
while (remainingAttempts > 0) {
|
||||
remainingAttempts -= 1
|
||||
try {
|
||||
downloadFile(outputStream, url, listener)
|
||||
exception = null
|
||||
break
|
||||
} catch (e: Exception) {
|
||||
exception = e
|
||||
}
|
||||
}
|
||||
if (exception != null) { throw exception }
|
||||
}
|
||||
|
||||
/**
|
||||
* Blocks the calling thread.
|
||||
*/
|
||||
fun downloadFile(outputStream: OutputStream, url: String, listener: SignalServiceAttachment.ProgressListener?) {
|
||||
// We need to throw a PushNetworkException or NonSuccessfulResponseCodeException
|
||||
// because the underlying Signal logic requires these to work correctly
|
||||
val oldPrefixedHost = "https://" + HttpUrl.get(url).host()
|
||||
var newPrefixedHost = oldPrefixedHost
|
||||
if (oldPrefixedHost.contains(FileServerAPI.fileStorageBucketURL)) {
|
||||
newPrefixedHost = FileServerAPI.shared.server
|
||||
}
|
||||
// Edge case that needs to work: https://file-static.lokinet.org/i1pNmpInq3w9gF3TP8TFCa1rSo38J6UM
|
||||
// → https://file.getsession.org/loki/v1/f/XLxogNXVEIWHk14NVCDeppzTujPHxu35
|
||||
val fileID = url.substringAfter(oldPrefixedHost).substringAfter("/f/")
|
||||
val sanitizedURL = "$newPrefixedHost/loki/v1/f/$fileID"
|
||||
val request = Request.Builder().url(sanitizedURL).get()
|
||||
try {
|
||||
val serverPublicKey = if (newPrefixedHost.contains(FileServerAPI.shared.server)) FileServerAPI.fileServerPublicKey
|
||||
else FileServerAPI.shared.getPublicKeyForOpenGroupServer(newPrefixedHost).get()
|
||||
val json = OnionRequestAPI.sendOnionRequest(request.build(), newPrefixedHost, serverPublicKey, isJSONRequired = false).get()
|
||||
val result = json["result"] as? String
|
||||
if (result == null) {
|
||||
Log.d("Loki", "Couldn't parse attachment from: $json.")
|
||||
throw PushNetworkException("Missing response body.")
|
||||
}
|
||||
val body = Base64.decode(result)
|
||||
if (body.size > FileServerAPI.maxFileSize) {
|
||||
Log.d("Loki", "Attachment size limit exceeded.")
|
||||
throw PushNetworkException("Max response size exceeded.")
|
||||
}
|
||||
val input = body.inputStream()
|
||||
val buffer = ByteArray(32768)
|
||||
var count = 0
|
||||
var bytes = input.read(buffer)
|
||||
while (bytes >= 0) {
|
||||
outputStream.write(buffer, 0, bytes)
|
||||
count += bytes
|
||||
if (count > FileServerAPI.maxFileSize) {
|
||||
Log.d("Loki", "Attachment size limit exceeded.")
|
||||
throw PushNetworkException("Max response size exceeded.")
|
||||
}
|
||||
listener?.onAttachmentProgress(body.size.toLong(), count.toLong())
|
||||
bytes = input.read(buffer)
|
||||
}
|
||||
} catch (e: Exception) {
|
||||
Log.d("Loki", "Couldn't download attachment due to error: $e.")
|
||||
throw if (e is NonSuccessfulResponseCodeException) e else PushNetworkException(e)
|
||||
}
|
||||
}
|
||||
|
||||
// UPLOAD
|
||||
|
||||
// TODO: migrate to v2 file server
|
||||
@Throws(PushNetworkException::class, NonSuccessfulResponseCodeException::class)
|
||||
fun uploadAttachment(server: String, attachment: PushAttachmentData): UploadResult {
|
||||
// This function mimics what Signal does in PushServiceSocket
|
||||
@@ -269,13 +197,13 @@ open class DotNetAPI {
|
||||
return upload(server, request) { json -> // Retrying is handled by AttachmentUploadJob
|
||||
val data = json["data"] as? Map<*, *>
|
||||
if (data == null) {
|
||||
Log.d("Loki", "Couldn't parse attachment from: $json.")
|
||||
Log.e("Loki", "Couldn't parse attachment from: $json.")
|
||||
throw Error.ParsingFailed
|
||||
}
|
||||
val id = data["id"] as? Long ?: (data["id"] as? Int)?.toLong() ?: (data["id"] as? String)?.toLong()
|
||||
val url = data["url"] as? String
|
||||
if (id == null || url == null || url.isEmpty()) {
|
||||
Log.d("Loki", "Couldn't parse upload from: $json.")
|
||||
Log.e("Loki", "Couldn't parse upload from: $json.")
|
||||
throw Error.ParsingFailed
|
||||
}
|
||||
UploadResult(id, url, file.transmittedDigest)
|
||||
|
@@ -14,7 +14,6 @@ import org.session.libsignal.utilities.*
|
||||
import org.session.libsignal.service.loki.Snode
|
||||
import org.session.libsignal.service.loki.*
|
||||
import org.session.libsession.utilities.AESGCM.EncryptionResult
|
||||
import org.session.libsignal.utilities.ThreadUtils
|
||||
import org.session.libsession.utilities.getBodyForOnionRequest
|
||||
import org.session.libsession.utilities.getHeadersForOnionRequest
|
||||
import org.session.libsignal.service.loki.Broadcaster
|
||||
@@ -82,7 +81,7 @@ object OnionRequestAPI {
|
||||
|
||||
internal sealed class Destination {
|
||||
class Snode(val snode: org.session.libsignal.service.loki.Snode) : Destination()
|
||||
class Server(val host: String, val target: String, val x25519PublicKey: String) : Destination()
|
||||
class Server(val host: String, val target: String, val x25519PublicKey: String, val scheme: String, val port: Int) : Destination()
|
||||
}
|
||||
|
||||
// region Private API
|
||||
@@ -330,7 +329,7 @@ object OnionRequestAPI {
|
||||
val plaintext = AESGCM.decrypt(ivAndCiphertext, destinationSymmetricKey)
|
||||
try {
|
||||
@Suppress("NAME_SHADOWING") val json = JsonUtil.fromJson(plaintext.toString(Charsets.UTF_8), Map::class.java)
|
||||
val statusCode = json["status"] as Int
|
||||
val statusCode = json["status_code"] as? Int ?: json["status"] as Int
|
||||
if (statusCode == 406) {
|
||||
@Suppress("NAME_SHADOWING") val body = mapOf( "result" to "Your clock is out of sync with the service node network." )
|
||||
val exception = HTTPRequestFailedAtDestinationException(statusCode, body)
|
||||
@@ -454,7 +453,7 @@ object OnionRequestAPI {
|
||||
val urlAsString = url.toString()
|
||||
val host = url.host()
|
||||
val endpoint = when {
|
||||
server.count() < urlAsString.count() -> urlAsString.substringAfter("$server/")
|
||||
server.count() < urlAsString.count() -> urlAsString.substringAfter(server).removePrefix("/")
|
||||
else -> ""
|
||||
}
|
||||
val body = request.getBodyForOnionRequest() ?: "null"
|
||||
@@ -464,7 +463,8 @@ object OnionRequestAPI {
|
||||
"method" to request.method(),
|
||||
"headers" to headers
|
||||
)
|
||||
val destination = Destination.Server(host, target, x25519PublicKey)
|
||||
url.isHttps
|
||||
val destination = Destination.Server(host, target, x25519PublicKey, url.scheme(), url.port())
|
||||
return sendOnionRequest(destination, payload, isJSONRequired).recover { exception ->
|
||||
Log.d("Loki", "Couldn't reach server: $urlAsString due to error: $exception.")
|
||||
throw exception
|
||||
|
@@ -70,7 +70,13 @@ object OnionRequestEncryption {
|
||||
payload = mutableMapOf( "destination" to rhs.snode.publicKeySet!!.ed25519Key )
|
||||
}
|
||||
is OnionRequestAPI.Destination.Server -> {
|
||||
payload = mutableMapOf( "host" to rhs.host, "target" to rhs.target, "method" to "POST" )
|
||||
payload = mutableMapOf(
|
||||
"host" to rhs.host,
|
||||
"target" to rhs.target,
|
||||
"method" to "POST",
|
||||
"protocol" to rhs.scheme,
|
||||
"port" to rhs.port
|
||||
)
|
||||
}
|
||||
}
|
||||
payload["ephemeral_key"] = previousEncryptionResult.ephemeralPublicKey.toHexString()
|
||||
|
@@ -33,7 +33,7 @@ object SnodeAPI {
|
||||
|
||||
// Settings
|
||||
private val maxRetryCount = 6
|
||||
private val minimumSnodePoolCount = 24
|
||||
private val minimumSnodePoolCount = 12
|
||||
private val minimumSwarmSnodeCount = 2
|
||||
// Use port 4433 if the API level can handle the network security configuration and enforce pinned certificates
|
||||
private val seedNodePort = if (Build.VERSION.SDK_INT < Build.VERSION_CODES.N) 443 else 4433
|
||||
@@ -41,7 +41,7 @@ object SnodeAPI {
|
||||
if (useTestnet) {
|
||||
setOf( "http://public.loki.foundation:38157" )
|
||||
} else {
|
||||
setOf( "https://storage.seed1.loki.network:$seedNodePort", "https://storage.seed3.loki.network:$seedNodePort", "https://public.loki.foundation:$seedNodePort" )
|
||||
setOf( "https://storage.seed1.loki.network:$seedNodePort ", "https://storage.seed3.loki.network:$seedNodePort ", "https://public.loki.foundation:$seedNodePort" )
|
||||
}
|
||||
}
|
||||
private val snodeFailureThreshold = 4
|
||||
@@ -92,6 +92,7 @@ object SnodeAPI {
|
||||
"method" to "get_n_service_nodes",
|
||||
"params" to mapOf(
|
||||
"active_only" to true,
|
||||
"limit" to 256,
|
||||
"fields" to mapOf( "public_ip" to true, "storage_port" to true, "pubkey_x25519" to true, "pubkey_ed25519" to true )
|
||||
)
|
||||
)
|
||||
|
@@ -16,10 +16,10 @@ data class SnodeMessage(
|
||||
internal fun toJSON(): Map<String, String> {
|
||||
return mapOf(
|
||||
"pubKey" to if (SnodeAPI.useTestnet) recipient.removing05PrefixIfNeeded() else recipient,
|
||||
"data" to data,
|
||||
"ttl" to ttl.toString(),
|
||||
"timestamp" to timestamp.toString(),
|
||||
"nonce" to ""
|
||||
"data" to data,
|
||||
"ttl" to ttl.toString(),
|
||||
"timestamp" to timestamp.toString(),
|
||||
"nonce" to ""
|
||||
)
|
||||
}
|
||||
}
|
||||
|
@@ -1,14 +1,16 @@
|
||||
package org.session.libsession.utilities
|
||||
|
||||
import org.whispersystems.curve25519.Curve25519
|
||||
import androidx.annotation.WorkerThread
|
||||
import org.session.libsignal.libsignal.util.ByteUtil
|
||||
import org.session.libsignal.service.internal.util.Util
|
||||
import org.session.libsignal.utilities.Hex
|
||||
import org.whispersystems.curve25519.Curve25519
|
||||
import javax.crypto.Cipher
|
||||
import javax.crypto.Mac
|
||||
import javax.crypto.spec.GCMParameterSpec
|
||||
import javax.crypto.spec.SecretKeySpec
|
||||
|
||||
@WorkerThread
|
||||
internal object AESGCM {
|
||||
|
||||
internal data class EncryptionResult(
|
||||
@@ -31,6 +33,16 @@ internal object AESGCM {
|
||||
return cipher.doFinal(ciphertext)
|
||||
}
|
||||
|
||||
/**
|
||||
* Sync. Don't call from the main thread.
|
||||
*/
|
||||
internal fun generateSymmetricKey(x25519PublicKey: ByteArray, x25519PrivateKey: ByteArray): ByteArray {
|
||||
val ephemeralSharedSecret = Curve25519.getInstance(Curve25519.BEST).calculateAgreement(x25519PublicKey, x25519PrivateKey)
|
||||
val mac = Mac.getInstance("HmacSHA256")
|
||||
mac.init(SecretKeySpec("LOKI".toByteArray(), "HmacSHA256"))
|
||||
return mac.doFinal(ephemeralSharedSecret)
|
||||
}
|
||||
|
||||
/**
|
||||
* Sync. Don't call from the main thread.
|
||||
*/
|
||||
@@ -47,10 +59,7 @@ internal object AESGCM {
|
||||
internal fun encrypt(plaintext: ByteArray, hexEncodedX25519PublicKey: String): EncryptionResult {
|
||||
val x25519PublicKey = Hex.fromStringCondensed(hexEncodedX25519PublicKey)
|
||||
val ephemeralKeyPair = Curve25519.getInstance(Curve25519.BEST).generateKeyPair()
|
||||
val ephemeralSharedSecret = Curve25519.getInstance(Curve25519.BEST).calculateAgreement(x25519PublicKey, ephemeralKeyPair.privateKey)
|
||||
val mac = Mac.getInstance("HmacSHA256")
|
||||
mac.init(SecretKeySpec("LOKI".toByteArray(), "HmacSHA256"))
|
||||
val symmetricKey = mac.doFinal(ephemeralSharedSecret)
|
||||
val symmetricKey = generateSymmetricKey(x25519PublicKey, ephemeralKeyPair.privateKey)
|
||||
val ciphertext = encrypt(plaintext, symmetricKey)
|
||||
return EncryptionResult(ciphertext, symmetricKey, ephemeralKeyPair.publicKey)
|
||||
}
|
||||
|
@@ -3,7 +3,9 @@ package org.session.libsession.utilities
|
||||
import okhttp3.HttpUrl
|
||||
import okhttp3.Request
|
||||
import org.session.libsession.messaging.file_server.FileServerAPI
|
||||
import org.session.libsession.messaging.file_server.FileServerAPIV2
|
||||
import org.session.libsession.snode.OnionRequestAPI
|
||||
import org.session.libsignal.service.api.crypto.AttachmentCipherInputStream
|
||||
import org.session.libsignal.utilities.logging.Log
|
||||
import org.session.libsignal.service.api.messages.SignalServiceAttachment
|
||||
import org.session.libsignal.service.api.push.exceptions.NonSuccessfulResponseCodeException
|
||||
@@ -39,50 +41,64 @@ object DownloadUtilities {
|
||||
*/
|
||||
@JvmStatic
|
||||
fun downloadFile(outputStream: OutputStream, url: String, maxSize: Int, listener: SignalServiceAttachment.ProgressListener?) {
|
||||
// We need to throw a PushNetworkException or NonSuccessfulResponseCodeException
|
||||
// because the underlying Signal logic requires these to work correctly
|
||||
val oldPrefixedHost = "https://" + HttpUrl.get(url).host()
|
||||
var newPrefixedHost = oldPrefixedHost
|
||||
if (oldPrefixedHost.contains(FileServerAPI.fileStorageBucketURL)) {
|
||||
newPrefixedHost = FileServerAPI.shared.server
|
||||
}
|
||||
// Edge case that needs to work: https://file-static.lokinet.org/i1pNmpInq3w9gF3TP8TFCa1rSo38J6UM
|
||||
// → https://file.getsession.org/loki/v1/f/XLxogNXVEIWHk14NVCDeppzTujPHxu35
|
||||
val fileID = url.substringAfter(oldPrefixedHost).substringAfter("/f/")
|
||||
val sanitizedURL = "$newPrefixedHost/loki/v1/f/$fileID"
|
||||
val request = Request.Builder().url(sanitizedURL).get()
|
||||
try {
|
||||
val serverPublicKey = if (newPrefixedHost.contains(FileServerAPI.shared.server)) FileServerAPI.fileServerPublicKey
|
||||
else FileServerAPI.shared.getPublicKeyForOpenGroupServer(newPrefixedHost).get()
|
||||
val json = OnionRequestAPI.sendOnionRequest(request.build(), newPrefixedHost, serverPublicKey, isJSONRequired = false).get()
|
||||
val result = json["result"] as? String
|
||||
if (result == null) {
|
||||
Log.d("Loki", "Couldn't parse attachment from: $json.")
|
||||
throw PushNetworkException("Missing response body.")
|
||||
}
|
||||
val body = Base64.decode(result)
|
||||
if (body.size > maxSize) {
|
||||
Log.d("Loki", "Attachment size limit exceeded.")
|
||||
throw PushNetworkException("Max response size exceeded.")
|
||||
}
|
||||
body.inputStream().use { input ->
|
||||
val buffer = ByteArray(32768)
|
||||
var count = 0
|
||||
var bytes = input.read(buffer)
|
||||
while (bytes >= 0) {
|
||||
outputStream.write(buffer, 0, bytes)
|
||||
count += bytes
|
||||
if (count > maxSize) {
|
||||
Log.d("Loki", "Attachment size limit exceeded.")
|
||||
throw PushNetworkException("Max response size exceeded.")
|
||||
}
|
||||
listener?.onAttachmentProgress(body.size.toLong(), count.toLong())
|
||||
bytes = input.read(buffer)
|
||||
|
||||
if (url.contains(FileServerAPIV2.DEFAULT_SERVER)) {
|
||||
val httpUrl = HttpUrl.parse(url)!!
|
||||
val fileId = httpUrl.pathSegments().last()
|
||||
try {
|
||||
FileServerAPIV2.download(fileId.toLong()).get().let {
|
||||
outputStream.write(it)
|
||||
}
|
||||
} catch (e: Exception) {
|
||||
Log.e("Loki", "Couln't download attachment due to error",e)
|
||||
throw e
|
||||
}
|
||||
} else {
|
||||
// We need to throw a PushNetworkException or NonSuccessfulResponseCodeException
|
||||
// because the underlying Signal logic requires these to work correctly
|
||||
val oldPrefixedHost = "https://" + HttpUrl.get(url).host()
|
||||
var newPrefixedHost = oldPrefixedHost
|
||||
if (oldPrefixedHost.contains(FileServerAPI.fileStorageBucketURL)) {
|
||||
newPrefixedHost = FileServerAPI.shared.server
|
||||
}
|
||||
// Edge case that needs to work: https://file-static.lokinet.org/i1pNmpInq3w9gF3TP8TFCa1rSo38J6UM
|
||||
// → https://file.getsession.org/loki/v1/f/XLxogNXVEIWHk14NVCDeppzTujPHxu35
|
||||
val fileID = url.substringAfter(oldPrefixedHost).substringAfter("/f/")
|
||||
val sanitizedURL = "$newPrefixedHost/loki/v1/f/$fileID"
|
||||
val request = Request.Builder().url(sanitizedURL).get()
|
||||
try {
|
||||
val serverPublicKey = if (newPrefixedHost.contains(FileServerAPI.shared.server)) FileServerAPI.fileServerPublicKey
|
||||
else FileServerAPI.shared.getPublicKeyForOpenGroupServer(newPrefixedHost).get()
|
||||
val json = OnionRequestAPI.sendOnionRequest(request.build(), newPrefixedHost, serverPublicKey, isJSONRequired = false).get()
|
||||
val result = json["result"] as? String
|
||||
if (result == null) {
|
||||
Log.d("Loki", "Couldn't parse attachment from: $json.")
|
||||
throw PushNetworkException("Missing response body.")
|
||||
}
|
||||
val body = Base64.decode(result)
|
||||
if (body.size > maxSize) {
|
||||
Log.d("Loki", "Attachment size limit exceeded.")
|
||||
throw PushNetworkException("Max response size exceeded.")
|
||||
}
|
||||
body.inputStream().use { input ->
|
||||
val buffer = ByteArray(32768)
|
||||
var count = 0
|
||||
var bytes = input.read(buffer)
|
||||
while (bytes >= 0) {
|
||||
outputStream.write(buffer, 0, bytes)
|
||||
count += bytes
|
||||
if (count > maxSize) {
|
||||
Log.d("Loki", "Attachment size limit exceeded.")
|
||||
throw PushNetworkException("Max response size exceeded.")
|
||||
}
|
||||
listener?.onAttachmentProgress(body.size.toLong(), count.toLong())
|
||||
bytes = input.read(buffer)
|
||||
}
|
||||
}
|
||||
} catch (e: Exception) {
|
||||
Log.e("Loki", "Couldn't download attachment due to error", e)
|
||||
throw if (e is NonSuccessfulResponseCodeException) e else PushNetworkException(e)
|
||||
}
|
||||
} catch (e: Exception) {
|
||||
Log.d("Loki", "Couldn't download attachment due to error: $e.")
|
||||
throw if (e is NonSuccessfulResponseCodeException) e else PushNetworkException(e)
|
||||
}
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user