Merge branch 'dev' of https://github.com/loki-project/session-android into data-extraction-2

This commit is contained in:
Brice-W
2021-04-08 15:00:31 +10:00
71 changed files with 1056 additions and 1530 deletions

View File

@@ -23,7 +23,7 @@ interface MessageDataProvider {
fun setAttachmentState(attachmentState: AttachmentState, attachmentId: Long, messageID: Long)
fun insertAttachment(messageId: Long, attachmentId: Long, stream : InputStream)
fun insertAttachment(messageId: Long, attachmentId: AttachmentId, stream : InputStream)
fun isOutgoingMessage(timestamp: Long): Boolean
@@ -31,9 +31,9 @@ interface MessageDataProvider {
fun updateAttachmentAfterUploadFailed(attachmentId: Long)
// Quotes
fun getMessageForQuote(timestamp: Long, author: Address): Long?
fun getAttachmentsAndLinkPreviewFor(messageID: Long): List<Attachment>
fun getMessageBodyFor(messageID: Long): String
fun getMessageForQuote(timestamp: Long, author: Address): Pair<Long, Boolean>?
fun getAttachmentsAndLinkPreviewFor(mmsId: Long): List<Attachment>
fun getMessageBodyFor(timestamp: Long, author: String): String
fun getAttachmentIDsFor(messageID: Long): List<Long>
fun getLinkPreviewAttachmentIDFor(messageID: Long): Long?

View File

@@ -6,23 +6,22 @@ import android.net.Uri
import org.session.libsession.messaging.jobs.AttachmentUploadJob
import org.session.libsession.messaging.jobs.Job
import org.session.libsession.messaging.jobs.MessageSendJob
import org.session.libsession.messaging.messages.Message
import org.session.libsession.messaging.messages.control.ConfigurationMessage
import org.session.libsession.messaging.messages.visible.Attachment
import org.session.libsession.messaging.messages.visible.VisibleMessage
import org.session.libsession.messaging.opengroups.OpenGroup
import org.session.libsession.messaging.sending_receiving.attachments.AttachmentId
import org.session.libsession.messaging.sending_receiving.dataextraction.DataExtractionNotificationInfoMessage
import org.session.libsession.messaging.sending_receiving.attachments.DatabaseAttachment
import org.session.libsession.messaging.sending_receiving.linkpreview.LinkPreview
import org.session.libsession.messaging.sending_receiving.quotes.QuoteModel
import org.session.libsession.messaging.threads.Address
import org.session.libsession.messaging.threads.GroupRecord
import org.session.libsession.messaging.threads.recipients.Recipient.RecipientSettings
import org.session.libsignal.libsignal.ecc.ECKeyPair
import org.session.libsignal.libsignal.ecc.ECPrivateKey
import org.session.libsignal.service.api.messages.SignalServiceAttachmentPointer
import org.session.libsignal.service.api.messages.SignalServiceGroup
import org.session.libsignal.service.internal.push.SignalServiceProtos
import org.session.libsignal.service.loki.api.opengroups.PublicChat
interface StorageProtocol {
@@ -33,8 +32,10 @@ interface StorageProtocol {
fun getUserDisplayName(): String?
fun getUserProfileKey(): ByteArray?
fun getUserProfilePictureURL(): String?
fun setUserProfilePictureUrl(newProfilePicture: String)
fun getProfileKeyForRecipient(recipientPublicKey: String): ByteArray?
fun setProfileKeyForRecipient(recipientPublicKey: String, profileKey: ByteArray)
// Signal Protocol
@@ -58,7 +59,7 @@ interface StorageProtocol {
// Open Groups
fun getOpenGroup(threadID: String): OpenGroup?
fun getThreadID(openGroupID: String): String?
fun getAllOpenGroups(): Map<Long, PublicChat>
fun getAllOpenGroups(): Map<Long, OpenGroup>
fun addOpenGroup(server: String, channel: Long)
fun setOpenGroupServerMessageID(messageID: Long, serverID: Long)
fun getQuoteServerID(quoteID: Long, publicKey: String): Long?
@@ -95,6 +96,7 @@ interface StorageProtocol {
// fun removeReceivedMessageTimestamps(timestamps: Set<Long>)
// Returns the IDs of the saved attachments.
fun persistAttachments(messageId: Long, attachments: List<Attachment>): List<Long>
fun getAttachmentsForMessage(messageId: Long): List<DatabaseAttachment>
fun getMessageIdInDatabase(timestamp: Long, author: String): Long?
fun markAsSent(timestamp: Long, author: String)
@@ -104,11 +106,13 @@ interface StorageProtocol {
// Closed Groups
fun getGroup(groupID: String): GroupRecord?
fun createGroup(groupID: String, title: String?, members: List<Address>, avatar: SignalServiceAttachmentPointer?, relay: String?, admins: List<Address>, formationTimestamp: Long)
fun isGroupActive(groupPublicKey: String): Boolean
fun setActive(groupID: String, value: Boolean)
fun removeMember(groupID: String, member: Address)
fun updateMembers(groupID: String, members: List<Address>)
// Closed Group
fun getAllClosedGroupPublicKeys(): Set<String>
fun getAllActiveClosedGroupPublicKeys(): Set<String>
fun addClosedGroupPublicKey(groupPublicKey: String)
fun removeClosedGroupPublicKey(groupPublicKey: String)
fun addClosedGroupEncryptionKeyPair(encryptionKeyPair: ECKeyPair, groupPublicKey: String)
@@ -140,11 +144,13 @@ interface StorageProtocol {
// Loki User
fun getDisplayName(publicKey: String): String?
fun setDisplayName(publicKey: String, newName: String)
fun getServerDisplayName(serverID: String, publicKey: String): String?
fun getProfilePictureURL(publicKey: String): String?
// Recipient
fun getRecipientSettings(address: Address): RecipientSettings?
fun addContacts(contacts: List<ConfigurationMessage.Contact>)
// PartAuthority
fun getAttachmentDataUri(attachmentId: AttachmentId): Uri
@@ -152,7 +158,7 @@ interface StorageProtocol {
// Message Handling
/// Returns the ID of the `TSIncomingMessage` that was constructed.
fun persist(message: VisibleMessage, quotes: QuoteModel?, linkPreview: List<LinkPreview?>, groupPublicKey: String?, openGroupID: String?): Long?
fun persist(message: VisibleMessage, quotes: QuoteModel?, linkPreview: List<LinkPreview?>, groupPublicKey: String?, openGroupID: String?, attachments: List<Attachment>): Long?
// Data Extraction Notification
fun insertDataExtractionNotificationMessage(senderPublicKey: String, message: DataExtractionNotificationInfoMessage, groupID: String?, sentTimestamp: Long)

View File

@@ -5,6 +5,8 @@ import org.session.libsession.messaging.fileserver.FileServerAPI
import org.session.libsession.messaging.sending_receiving.attachments.AttachmentState
import org.session.libsession.messaging.utilities.DotNetAPI
import org.session.libsignal.service.api.crypto.AttachmentCipherInputStream
import org.session.libsignal.utilities.Base64
import org.session.libsignal.utilities.logging.Log
import java.io.File
import java.io.FileInputStream
@@ -32,12 +34,7 @@ class AttachmentDownloadJob(val attachmentID: Long, val databaseMessageID: Long)
}
override fun execute() {
val messageDataProvider = MessagingConfiguration.shared.messageDataProvider
val attachmentStream = messageDataProvider.getAttachmentStream(attachmentID) ?: return handleFailure(Error.NoAttachment)
messageDataProvider.setAttachmentState(AttachmentState.STARTED, attachmentID, this.databaseMessageID)
val tempFile = createTempFile()
val handleFailure: (java.lang.Exception) -> Unit = { exception ->
tempFile.delete()
if(exception is Error && exception == Error.NoAttachment) {
MessagingConfiguration.shared.messageDataProvider.setAttachmentState(AttachmentState.FAILED, attachmentID, databaseMessageID)
this.handlePermanentFailure(exception)
@@ -51,24 +48,30 @@ class AttachmentDownloadJob(val attachmentID: Long, val databaseMessageID: Long)
}
}
try {
FileServerAPI.shared.downloadFile(tempFile, attachmentStream.url, MAX_ATTACHMENT_SIZE, attachmentStream.listener)
val messageDataProvider = MessagingConfiguration.shared.messageDataProvider
val attachment = messageDataProvider.getDatabaseAttachment(attachmentID) ?: return handleFailure(Error.NoAttachment)
messageDataProvider.setAttachmentState(AttachmentState.STARTED, attachmentID, this.databaseMessageID)
val tempFile = createTempFile()
FileServerAPI.shared.downloadFile(tempFile, attachment.url, MAX_ATTACHMENT_SIZE, null)
// DECRYPTION
// Assume we're retrieving an attachment for an open group server if the digest is not set
val stream = if (attachment.digest?.size ?: 0 == 0 || attachment.key.isNullOrEmpty()) FileInputStream(tempFile)
else AttachmentCipherInputStream.createForAttachment(tempFile, attachment.size, Base64.decode(attachment.key), attachment.digest)
messageDataProvider.insertAttachment(databaseMessageID, attachment.attachmentId, stream)
tempFile.delete()
handleSuccess()
} catch (e: Exception) {
return handleFailure(e)
}
// DECRYPTION
// Assume we're retrieving an attachment for an open group server if the digest is not set
var stream = if (!attachmentStream.digest.isPresent || attachmentStream.key == null) FileInputStream(tempFile)
else AttachmentCipherInputStream.createForAttachment(tempFile, attachmentStream.length.or(0).toLong(), attachmentStream.key?.toByteArray(), attachmentStream?.digest.get())
messageDataProvider.insertAttachment(databaseMessageID, attachmentID, stream)
tempFile.delete()
}
private fun handleSuccess() {
Log.w(AttachmentUploadJob.TAG, "Attachment downloaded successfully.")
delegate?.handleJobSucceeded(this)
}

View File

@@ -15,7 +15,6 @@ import org.session.libsignal.service.internal.push.PushAttachmentData
import org.session.libsignal.service.internal.push.http.AttachmentCipherOutputStreamFactory
import org.session.libsignal.service.internal.util.Util
import org.session.libsignal.service.loki.utilities.PlaintextOutputStreamFactory
import org.session.libsignal.utilities.ThreadUtils
import org.session.libsignal.utilities.logging.Log
class AttachmentUploadJob(val attachmentID: Long, val threadID: String, val message: Message, val messageSendJobID: String) : Job {
@@ -45,41 +44,40 @@ class AttachmentUploadJob(val attachmentID: Long, val threadID: String, val mess
}
override fun execute() {
ThreadUtils.queue {
try {
val attachment = MessagingConfiguration.shared.messageDataProvider.getScaledSignalAttachmentStream(attachmentID)
?: return@queue handleFailure(Error.NoAttachment)
try {
val attachment = MessagingConfiguration.shared.messageDataProvider.getScaledSignalAttachmentStream(attachmentID)
?: return handleFailure(Error.NoAttachment)
var server = FileServerAPI.shared.server
var shouldEncrypt = true
val usePadding = false
val openGroup = MessagingConfiguration.shared.storage.getOpenGroup(threadID)
openGroup?.let {
server = it.server
shouldEncrypt = false
}
var server = FileServerAPI.shared.server
var shouldEncrypt = true
val usePadding = false
val openGroup = MessagingConfiguration.shared.storage.getOpenGroup(threadID)
openGroup?.let {
server = it.server
shouldEncrypt = false
}
val attachmentKey = Util.getSecretBytes(64)
val paddedLength = if (usePadding) PaddingInputStream.getPaddedSize(attachment.length) else attachment.length
val dataStream = if (usePadding) PaddingInputStream(attachment.inputStream, attachment.length) else attachment.inputStream
val ciphertextLength = if (shouldEncrypt) AttachmentCipherOutputStream.getCiphertextLength(paddedLength) else attachment.length
val attachmentKey = Util.getSecretBytes(64)
val paddedLength = if (usePadding) PaddingInputStream.getPaddedSize(attachment.length) else attachment.length
val dataStream = if (usePadding) PaddingInputStream(attachment.inputStream, attachment.length) else attachment.inputStream
val ciphertextLength = if (shouldEncrypt) AttachmentCipherOutputStream.getCiphertextLength(paddedLength) else attachment.length
val outputStreamFactory = if (shouldEncrypt) AttachmentCipherOutputStreamFactory(attachmentKey) else PlaintextOutputStreamFactory()
val attachmentData = PushAttachmentData(attachment.contentType, dataStream, ciphertextLength, outputStreamFactory, attachment.listener)
val outputStreamFactory = if (shouldEncrypt) AttachmentCipherOutputStreamFactory(attachmentKey) else PlaintextOutputStreamFactory()
val attachmentData = PushAttachmentData(attachment.contentType, dataStream, ciphertextLength, outputStreamFactory, attachment.listener)
val uploadResult = FileServerAPI.shared.uploadAttachment(server, attachmentData)
handleSuccess(attachment, attachmentKey, uploadResult)
val uploadResult = FileServerAPI.shared.uploadAttachment(server, attachmentData)
handleSuccess(attachment, attachmentKey, uploadResult)
} catch (e: java.lang.Exception) {
if (e is Error && e == Error.NoAttachment) {
this.handlePermanentFailure(e)
} else if (e is DotNetAPI.Error && !e.isRetryable) {
this.handlePermanentFailure(e)
} else {
this.handleFailure(e)
}
} catch (e: java.lang.Exception) {
if (e is Error && e == Error.NoAttachment) {
this.handlePermanentFailure(e)
} else if (e is DotNetAPI.Error && !e.isRetryable) {
this.handlePermanentFailure(e)
} else {
this.handleFailure(e)
}
}
}
private fun handleSuccess(attachment: SignalServiceAttachmentStream, attachmentKey: ByteArray, uploadResult: DotNetAPI.UploadResult) {

View File

@@ -1,32 +1,51 @@
package org.session.libsession.messaging.jobs
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
import org.session.libsession.messaging.MessagingConfiguration
import org.session.libsignal.utilities.logging.Log
import java.util.*
import java.util.concurrent.Executors
import kotlin.concurrent.schedule
import kotlin.math.min
import kotlin.math.pow
import java.util.Timer
import org.session.libsession.messaging.MessagingConfiguration
import org.session.libsignal.utilities.logging.Log
import kotlin.concurrent.schedule
import kotlin.math.roundToLong
class JobQueue : JobDelegate {
private var hasResumedPendingJobs = false // Just for debugging
private val dispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
private val scope = GlobalScope + SupervisorJob()
private val queue = Channel<Job>(UNLIMITED)
val timer = Timer()
init {
// process jobs
scope.launch(dispatcher) {
while (isActive) {
queue.receive().let { job ->
job.delegate = this@JobQueue
job.execute()
}
}
}
}
companion object {
@JvmStatic
val shared: JobQueue by lazy { JobQueue() }
}
fun add(job: Job) {
addWithoutExecuting(job)
job.execute()
queue.offer(job) // offer always called on unlimited capacity
}
fun addWithoutExecuting(job: Job) {
private fun addWithoutExecuting(job: Job) {
job.id = System.currentTimeMillis().toString()
MessagingConfiguration.shared.storage.persistJob(job)
job.delegate = this
}
fun resumePendingJobs() {
@@ -40,8 +59,7 @@ class JobQueue : JobDelegate {
val allPendingJobs = MessagingConfiguration.shared.storage.getAllPendingJobs(type)
allPendingJobs.sortedBy { it.id }.forEach { job ->
Log.i("Jobs", "Resuming pending job of type: ${job::class.simpleName}.")
job.delegate = this
job.execute()
queue.offer(job) // offer always called on unlimited capacity
}
}
}
@@ -60,9 +78,9 @@ class JobQueue : JobDelegate {
} else {
val retryInterval = getRetryInterval(job)
Log.i("Jobs", "${job::class.simpleName} failed; scheduling retry (failure count is ${job.failureCount}).")
Timer().schedule(delay = retryInterval) {
timer.schedule(delay = retryInterval) {
Log.i("Jobs", "Retrying ${job::class.simpleName}.")
job.execute()
queue.offer(job)
}
}
}

View File

@@ -18,6 +18,8 @@ class MessageReceiveJob(val data: ByteArray, val isBackgroundPoll: Boolean, val
val TAG = MessageReceiveJob::class.simpleName
val KEY: String = "MessageReceiveJob"
private val RECEIVE_LOCK = Object()
//keys used for database storage purpose
private val KEY_DATA = "data"
private val KEY_IS_BACKGROUND_POLL = "is_background_poll"
@@ -34,17 +36,19 @@ class MessageReceiveJob(val data: ByteArray, val isBackgroundPoll: Boolean, val
try {
val isRetry: Boolean = failureCount != 0
val (message, proto) = MessageReceiver.parse(this.data, this.openGroupMessageServerID, isRetry)
MessageReceiver.handle(message, proto, this.openGroupID)
synchronized(RECEIVE_LOCK) {
MessageReceiver.handle(message, proto, this.openGroupID)
}
this.handleSuccess()
deferred.resolve(Unit)
} catch (e: Exception) {
Log.d(TAG, "Couldn't receive message due to error: $e.")
Log.e(TAG, "Couldn't receive message due to error", e)
val error = e as? MessageReceiver.Error
if (error != null && !error.isRetryable) {
Log.d("Loki", "Message receive job permanently failed due to error: $error.")
Log.e("Loki", "Message receive job permanently failed due to error", e)
this.handlePermanentFailure(error)
} else {
Log.d("Loki", "Couldn't receive message due to error: $e.")
Log.e("Loki", "Couldn't receive message due to error", e)
this.handleFailure(e)
}
deferred.resolve(Unit) // The promise is just used to keep track of when we're done

View File

@@ -8,12 +8,12 @@ import org.session.libsession.utilities.GroupUtil
import org.session.libsignal.libsignal.ecc.DjbECPrivateKey
import org.session.libsignal.libsignal.ecc.DjbECPublicKey
import org.session.libsignal.libsignal.ecc.ECKeyPair
import org.session.libsignal.utilities.logging.Log
import org.session.libsignal.service.internal.push.SignalServiceProtos
import org.session.libsignal.service.internal.push.SignalServiceProtos.DataMessage
import org.session.libsignal.service.loki.utilities.removing05PrefixIfNeeded
import org.session.libsignal.service.loki.utilities.toHexString
import org.session.libsignal.utilities.Hex
import org.session.libsignal.utilities.logging.Log
class ClosedGroupControlMessage() : ControlMessage() {
@@ -72,7 +72,8 @@ class ClosedGroupControlMessage() : ControlMessage() {
const val TAG = "ClosedGroupControlMessage"
fun fromProto(proto: SignalServiceProtos.Content): ClosedGroupControlMessage? {
val closedGroupControlMessageProto = proto.dataMessage?.closedGroupControlMessage ?: return null
if (!proto.hasDataMessage() || !proto.dataMessage.hasClosedGroupControlMessage()) return null
val closedGroupControlMessageProto = proto.dataMessage?.closedGroupControlMessage!!
val kind: Kind
when(closedGroupControlMessageProto.type) {
DataMessage.ClosedGroupControlMessage.Type.NEW -> {

View File

@@ -1,10 +1,7 @@
package org.session.libsession.messaging.messages.control
import com.google.protobuf.ByteString
import org.session.libsignal.libsignal.ecc.ECKeyPair
import org.session.libsignal.service.internal.push.SignalServiceProtos
import org.session.libsignal.utilities.logging.Log
import java.lang.Exception
class DataExtractionNotification(): ControlMessage() {
var kind: Kind? = null
@@ -25,7 +22,8 @@ class DataExtractionNotification(): ControlMessage() {
const val TAG = "DataExtractionNotification"
fun fromProto(proto: SignalServiceProtos.Content): DataExtractionNotification? {
val dataExtractionNotification = proto.dataExtractionNotification ?: return null
if (!proto.hasDataExtractionNotification()) return null
val dataExtractionNotification = proto.dataExtractionNotification!!
val kind: Kind = when(dataExtractionNotification.type) {
SignalServiceProtos.DataExtractionNotification.Type.SCREENSHOT -> Kind.Screenshot()
SignalServiceProtos.DataExtractionNotification.Type.MEDIA_SAVED -> {

View File

@@ -7,23 +7,29 @@ import org.session.libsignal.service.internal.push.SignalServiceProtos
class ExpirationTimerUpdate() : ControlMessage() {
/// In the case of a sync message, the public key of the person the message was targeted at.
/// - Note: `nil` if this isn't a sync message.
var syncTarget: String? = null
var duration: Int? = 0
override val isSelfSendValid: Boolean = true
companion object {
const val TAG = "ExpirationTimerUpdate"
fun fromProto(proto: SignalServiceProtos.Content): ExpirationTimerUpdate? {
val dataMessageProto = proto.dataMessage ?: return null
val dataMessageProto = if (proto.hasDataMessage()) proto.dataMessage else return null
val isExpirationTimerUpdate = dataMessageProto.flags.and(SignalServiceProtos.DataMessage.Flags.EXPIRATION_TIMER_UPDATE_VALUE) != 0
if (!isExpirationTimerUpdate) return null
val syncTarget = dataMessageProto.syncTarget
val duration = dataMessageProto.expireTimer
return ExpirationTimerUpdate(duration)
return ExpirationTimerUpdate(syncTarget, duration)
}
}
//constructor
internal constructor(duration: Int) : this() {
internal constructor(syncTarget: String?, duration: Int) : this() {
this.syncTarget = syncTarget
this.duration = duration
}
@@ -42,7 +48,10 @@ class ExpirationTimerUpdate() : ControlMessage() {
val dataMessageProto = SignalServiceProtos.DataMessage.newBuilder()
dataMessageProto.flags = SignalServiceProtos.DataMessage.Flags.EXPIRATION_TIMER_UPDATE_VALUE
dataMessageProto.expireTimer = duration
syncTarget?.let { dataMessageProto.syncTarget = it }
// Sync target
if (syncTarget != null) {
dataMessageProto.syncTarget = syncTarget
}
// Group context
if (MessagingConfiguration.shared.storage.isClosedGroup(recipient!!)) {
try {

View File

@@ -1,7 +1,7 @@
package org.session.libsession.messaging.messages.control
import org.session.libsignal.utilities.logging.Log
import org.session.libsignal.service.internal.push.SignalServiceProtos
import org.session.libsignal.utilities.logging.Log
class ReadReceipt() : ControlMessage() {
@@ -11,7 +11,7 @@ class ReadReceipt() : ControlMessage() {
const val TAG = "ReadReceipt"
fun fromProto(proto: SignalServiceProtos.Content): ReadReceipt? {
val receiptProto = proto.receiptMessage ?: return null
val receiptProto = if (proto.hasReceiptMessage()) proto.receiptMessage else return null
if (receiptProto.type != SignalServiceProtos.ReceiptMessage.Type.READ) return null
val timestamps = receiptProto.timestampList
if (timestamps.isEmpty()) return null

View File

@@ -1,7 +1,7 @@
package org.session.libsession.messaging.messages.control
import org.session.libsignal.utilities.logging.Log
import org.session.libsignal.service.internal.push.SignalServiceProtos
import org.session.libsignal.utilities.logging.Log
class TypingIndicator() : ControlMessage() {
@@ -11,7 +11,7 @@ class TypingIndicator() : ControlMessage() {
const val TAG = "TypingIndicator"
fun fromProto(proto: SignalServiceProtos.Content): TypingIndicator? {
val typingIndicatorProto = proto.typingMessage ?: return null
val typingIndicatorProto = if (proto.hasTypingMessage()) proto.typingMessage else return null
val kind = Kind.fromProto(typingIndicatorProto.action)
return TypingIndicator(kind = kind)
}

View File

@@ -73,13 +73,13 @@ public class IncomingMediaMessage {
Address from,
long expiresIn,
Optional<SignalServiceGroup> group,
Optional<List<SignalServiceAttachment>> attachments,
List<SignalServiceAttachment> attachments,
Optional<QuoteModel> quote,
Optional<List<LinkPreview>> linkPreviews,
Optional<DataExtractionNotificationInfoMessage> dataExtractionNotification)
{
return new IncomingMediaMessage(from, message.getReceivedTimestamp(), -1, expiresIn, false,
false, Optional.fromNullable(message.getText()), group, attachments, quote, Optional.absent(), linkPreviews, dataExtractionNotification);
return new IncomingMediaMessage(from, message.getSentTimestamp(), -1, expiresIn, false,
false, Optional.fromNullable(message.getText()), group, Optional.fromNullable(attachments), quote, Optional.absent(), linkPreviews, dataExtractionNotification);
}
public int getSubscriptionId() {

View File

@@ -2,6 +2,7 @@ package org.session.libsession.messaging.messages.signal;
import android.os.Parcel;
import android.os.Parcelable;
import androidx.annotation.Nullable;
import org.session.libsession.messaging.messages.visible.VisibleMessage;
@@ -100,7 +101,7 @@ public class IncomingTextMessage implements Parcelable {
Optional<SignalServiceGroup> group,
long expiresInMillis)
{
return new IncomingTextMessage(sender, 1, message.getReceivedTimestamp(), message.getText(), group, expiresInMillis, false);
return new IncomingTextMessage(sender, 1, message.getSentTimestamp(), message.getText(), group, expiresInMillis, false);
}
public int getSubscriptionId() {

View File

@@ -12,7 +12,7 @@ public class OutgoingExpirationUpdateMessage extends OutgoingSecureMediaMessage
public OutgoingExpirationUpdateMessage(Recipient recipient, long sentTimeMillis, long expiresIn) {
super(recipient, "", new LinkedList<Attachment>(), sentTimeMillis,
DistributionTypes.CONVERSATION, expiresIn, null, Collections.emptyList(),
DistributionTypes.CONVERSATION, expiresIn, true, null, Collections.emptyList(),
Collections.emptyList());
}

View File

@@ -32,7 +32,7 @@ public class OutgoingGroupMediaMessage extends OutgoingSecureMediaMessage {
throws IOException
{
super(recipient, encodedGroupContext, avatar, sentTimeMillis,
DistributionTypes.CONVERSATION, expiresIn, quote, contacts, previews);
DistributionTypes.CONVERSATION, expiresIn, false, quote, contacts, previews);
this.group = GroupContext.parseFrom(Base64.decode(encodedGroupContext));
}
@@ -48,7 +48,7 @@ public class OutgoingGroupMediaMessage extends OutgoingSecureMediaMessage {
super(recipient, Base64.encodeBytes(group.toByteArray()),
new LinkedList<Attachment>() {{if (avatar != null) add(avatar);}},
System.currentTimeMillis(),
DistributionTypes.CONVERSATION, expireIn, quote, contacts, previews);
DistributionTypes.CONVERSATION, expireIn, false, quote, contacts, previews);
this.group = group;
}
@@ -65,7 +65,7 @@ public class OutgoingGroupMediaMessage extends OutgoingSecureMediaMessage {
super(recipient, Base64.encodeBytes(group.toByteArray()),
new LinkedList<Attachment>() {{if (avatar != null) add(avatar);}},
sentTime,
DistributionTypes.CONVERSATION, expireIn, quote, contacts, previews);
DistributionTypes.CONVERSATION, expireIn, false, quote, contacts, previews);
this.group = group;
}

View File

@@ -26,6 +26,7 @@ public class OutgoingMediaMessage {
private final int distributionType;
private final int subscriptionId;
private final long expiresIn;
private final boolean expirationUpdate;
private final QuoteModel outgoingQuote;
private final List<NetworkFailure> networkFailures = new LinkedList<>();
@@ -36,6 +37,7 @@ public class OutgoingMediaMessage {
public OutgoingMediaMessage(Recipient recipient, String message,
List<Attachment> attachments, long sentTimeMillis,
int subscriptionId, long expiresIn,
boolean expirationUpdate,
int distributionType,
@Nullable QuoteModel outgoingQuote,
@NonNull List<Contact> contacts,
@@ -50,6 +52,7 @@ public class OutgoingMediaMessage {
this.attachments = attachments;
this.subscriptionId = subscriptionId;
this.expiresIn = expiresIn;
this.expirationUpdate = expirationUpdate;
this.outgoingQuote = outgoingQuote;
this.contacts.addAll(contacts);
@@ -66,6 +69,7 @@ public class OutgoingMediaMessage {
this.sentTimeMillis = that.sentTimeMillis;
this.subscriptionId = that.subscriptionId;
this.expiresIn = that.expiresIn;
this.expirationUpdate = that.expirationUpdate;
this.outgoingQuote = that.outgoingQuote;
this.identityKeyMismatches.addAll(that.identityKeyMismatches);
@@ -85,7 +89,7 @@ public class OutgoingMediaMessage {
previews = Collections.singletonList(linkPreview);
}
return new OutgoingMediaMessage(recipient, message.getText(), attachments, message.getSentTimestamp(), -1,
recipient.getExpireMessages() * 1000, DistributionTypes.DEFAULT, outgoingQuote, Collections.emptyList(),
recipient.getExpireMessages() * 1000, false, DistributionTypes.DEFAULT, outgoingQuote, Collections.emptyList(),
previews, Collections.emptyList(), Collections.emptyList());
}
@@ -109,9 +113,7 @@ public class OutgoingMediaMessage {
return false;
}
public boolean isExpirationUpdate() {
return false;
}
public boolean isExpirationUpdate() { return expirationUpdate; }
public long getSentTimeMillis() {
return sentTimeMillis;

View File

@@ -19,11 +19,12 @@ public class OutgoingSecureMediaMessage extends OutgoingMediaMessage {
long sentTimeMillis,
int distributionType,
long expiresIn,
boolean expirationUpdate,
@Nullable QuoteModel quote,
@NonNull List<Contact> contacts,
@NonNull List<LinkPreview> previews)
{
super(recipient, body, attachments, sentTimeMillis, -1, expiresIn, distributionType, quote, contacts, previews, Collections.emptyList(), Collections.emptyList());
super(recipient, body, attachments, sentTimeMillis, -1, expiresIn, expirationUpdate, distributionType, quote, contacts, previews, Collections.emptyList(), Collections.emptyList());
}
public OutgoingSecureMediaMessage(OutgoingMediaMessage base) {

View File

@@ -9,16 +9,18 @@ public class OutgoingTextMessage {
private final String message;
private final int subscriptionId;
private final long expiresIn;
private final long sentTimestampMillis;
public OutgoingTextMessage(Recipient recipient, String message, long expiresIn, int subscriptionId) {
public OutgoingTextMessage(Recipient recipient, String message, long expiresIn, int subscriptionId, long sentTimestampMillis) {
this.recipient = recipient;
this.message = message;
this.expiresIn = expiresIn;
this.subscriptionId = subscriptionId;
this.sentTimestampMillis = sentTimestampMillis;
}
public static OutgoingTextMessage from(VisibleMessage message, Recipient recipient) {
return new OutgoingTextMessage(recipient, message.getText(), recipient.getExpireMessages() * 1000, -1);
return new OutgoingTextMessage(recipient, message.getText(), recipient.getExpireMessages() * 1000, -1, message.getSentTimestamp());
}
public long getExpiresIn() {
@@ -37,6 +39,10 @@ public class OutgoingTextMessage {
return recipient;
}
public long getSentTimestampMillis() {
return sentTimestampMillis;
}
public boolean isSecureMessage() {
return true;
}

View File

@@ -3,10 +3,10 @@ package org.session.libsession.messaging.messages.visible
import android.util.Size
import android.webkit.MimeTypeMap
import com.google.protobuf.ByteString
import org.session.libsession.messaging.sending_receiving.attachments.AttachmentTransferProgress
import org.session.libsession.messaging.sending_receiving.attachments.DatabaseAttachment
import org.session.libsignal.service.api.messages.SignalServiceAttachmentPointer
import org.session.libsession.messaging.sending_receiving.attachments.Attachment as SignalAttachment
import org.session.libsession.messaging.sending_receiving.attachments.PointerAttachment
import org.session.libsignal.libsignal.util.guava.Optional
import org.session.libsignal.service.api.messages.SignalServiceAttachmentPointer
import org.session.libsignal.service.internal.push.SignalServiceProtos
import java.io.File
@@ -23,7 +23,7 @@ class Attachment {
var url: String? = null
companion object {
fun fromProto(proto: SignalServiceProtos.AttachmentPointer): Attachment? {
fun fromProto(proto: SignalServiceProtos.AttachmentPointer): Attachment {
val result = Attachment()
result.fileName = proto.fileName
fun inferContentType(): String {
@@ -100,8 +100,14 @@ class Attachment {
fun toSignalAttachment(): SignalAttachment? {
if (!isValid()) return null
return DatabaseAttachment(null, 0, false, false, contentType, 0,
sizeInBytes?.toLong() ?: 0, fileName, null, key.toString(), null, digest, null, kind == Kind.VOICE_MESSAGE,
size?.width ?: 0, size?.height ?: 0, false, caption, url)
return PointerAttachment.forAttachment((this))
}
fun toSignalPointer(): SignalServiceAttachmentPointer? {
if (!isValid()) return null
return SignalServiceAttachmentPointer(0, contentType, key, Optional.fromNullable(sizeInBytes), null,
size?.width ?: 0, size?.height ?: 0, Optional.fromNullable(digest), Optional.fromNullable(fileName),
kind == Kind.VOICE_MESSAGE, Optional.fromNullable(caption), url)
}
}

View File

@@ -15,7 +15,7 @@ class VisibleMessage : Message() {
var syncTarget: String? = null
var text: String? = null
var attachmentIDs = ArrayList<Long>()
val attachmentIDs: MutableList<Long> = mutableListOf()
var quote: Quote? = null
var linkPreview: LinkPreview? = null
var contact: Contact? = null
@@ -27,17 +27,19 @@ class VisibleMessage : Message() {
const val TAG = "VisibleMessage"
fun fromProto(proto: SignalServiceProtos.Content): VisibleMessage? {
val dataMessage = proto.dataMessage ?: return null
val dataMessage = if (proto.hasDataMessage()) proto.dataMessage else return null
val result = VisibleMessage()
result.syncTarget = dataMessage.syncTarget
if (dataMessage.hasSyncTarget()) {
result.syncTarget = dataMessage.syncTarget
}
result.text = dataMessage.body
// Attachments are handled in MessageReceiver
val quoteProto = dataMessage.quote
val quoteProto = if (dataMessage.hasQuote()) dataMessage.quote else null
quoteProto?.let {
val quote = Quote.fromProto(quoteProto)
quote?.let { result.quote = quote }
}
val linkPreviewProto = dataMessage.previewList.first()
val linkPreviewProto = dataMessage.previewList.firstOrNull()
linkPreviewProto?.let {
val linkPreview = LinkPreview.fromProto(linkPreviewProto)
linkPreview?.let { result.linkPreview = linkPreview }
@@ -54,7 +56,7 @@ class VisibleMessage : Message() {
val databaseAttachment = it as DatabaseAttachment
databaseAttachment.attachmentId.rowId
}
this.attachmentIDs = attachmentIDs as ArrayList<Long>
this.attachmentIDs.addAll(attachmentIDs)
}
fun isMediaMessage(): Boolean {

View File

@@ -1,5 +1,6 @@
package org.session.libsession.messaging.opengroups
import org.session.libsignal.service.loki.api.opengroups.PublicChat
import org.session.libsignal.utilities.JsonUtil
data class OpenGroup(
@@ -13,6 +14,9 @@ data class OpenGroup(
companion object {
@JvmStatic fun from(publicChat: PublicChat): OpenGroup =
OpenGroup(publicChat.channel, publicChat.server, publicChat.displayName, publicChat.isDeletable)
@JvmStatic fun getId(channel: Long, server: String): String {
return "$server.$channel"
}

View File

@@ -6,15 +6,13 @@ import nl.komponents.kovenant.deferred
import nl.komponents.kovenant.functional.map
import nl.komponents.kovenant.then
import org.session.libsession.messaging.MessagingConfiguration
import org.session.libsession.messaging.utilities.DotNetAPI
import org.session.libsession.messaging.fileserver.FileServerAPI
import org.session.libsignal.utilities.logging.Log
import org.session.libsignal.utilities.*
import org.session.libsession.messaging.utilities.DotNetAPI
import org.session.libsignal.service.loki.utilities.DownloadUtilities
import org.session.libsignal.service.loki.utilities.retryIfNeeded
import org.session.libsignal.utilities.*
import org.session.libsignal.utilities.Base64
import org.session.libsignal.utilities.logging.Log
import java.io.ByteArrayOutputStream
import java.text.SimpleDateFormat
import java.util.*
@@ -156,6 +154,7 @@ object OpenGroupAPI: DotNetAPI() {
}
}
@JvmStatic
fun getDeletedMessageServerIDs(channel: Long, server: String): Promise<List<Long>, Exception> {
Log.d("Loki", "Getting deleted messages for open group with ID: $channel on server: $server.")
val storage = MessagingConfiguration.shared.storage
@@ -188,6 +187,7 @@ object OpenGroupAPI: DotNetAPI() {
}
}
@JvmStatic
fun sendMessage(message: OpenGroupMessage, channel: Long, server: String): Promise<OpenGroupMessage, Exception> {
val deferred = deferred<OpenGroupMessage, Exception>()
val storage = MessagingConfiguration.shared.storage
@@ -252,6 +252,7 @@ object OpenGroupAPI: DotNetAPI() {
}
}
@JvmStatic
fun getModerators(channel: Long, server: String): Promise<Set<String>, Exception> {
return execute(HTTPVerb.GET, server, "loki/v1/channel/$channel/get_moderators").then(sharedContext) { json ->
try {
@@ -270,6 +271,7 @@ object OpenGroupAPI: DotNetAPI() {
}
}
@JvmStatic
fun getChannelInfo(channel: Long, server: String): Promise<OpenGroupInfo, Exception> {
return retryIfNeeded(maxRetryCount) {
val parameters = mapOf( "include_annotations" to 1 )
@@ -294,6 +296,7 @@ object OpenGroupAPI: DotNetAPI() {
}
}
@JvmStatic
fun updateProfileIfNeeded(channel: Long, server: String, groupID: String, info: OpenGroupInfo, isForcedUpdate: Boolean) {
val storage = MessagingConfiguration.shared.storage
storage.setUserCount(channel, server, info.memberCount)
@@ -307,6 +310,7 @@ object OpenGroupAPI: DotNetAPI() {
}
}
@JvmStatic
fun downloadOpenGroupProfilePicture(server: String, endpoint: String): ByteArray? {
val url = "${server.removeSuffix("/")}/${endpoint.removePrefix("/")}"
Log.d("Loki", "Downloading open group profile picture from \"$url\".")
@@ -323,6 +327,7 @@ object OpenGroupAPI: DotNetAPI() {
}
}
@JvmStatic
fun join(channel: Long, server: String): Promise<Unit, Exception> {
return retryIfNeeded(maxRetryCount) {
execute(HTTPVerb.POST, server, "/channels/$channel/subscribe").then {
@@ -331,6 +336,7 @@ object OpenGroupAPI: DotNetAPI() {
}
}
@JvmStatic
fun leave(channel: Long, server: String): Promise<Unit, Exception> {
return retryIfNeeded(maxRetryCount) {
execute(HTTPVerb.DELETE, server, "/channels/$channel/subscribe").then {
@@ -348,6 +354,7 @@ object OpenGroupAPI: DotNetAPI() {
}
}
@JvmStatic
fun getDisplayNames(publicKeys: Set<String>, server: String): Promise<Map<String, String>, Exception> {
return getUserProfiles(publicKeys, server, false).map(sharedContext) { json ->
val mapping = mutableMapOf<String, String>()
@@ -362,12 +369,14 @@ object OpenGroupAPI: DotNetAPI() {
}
}
@JvmStatic
fun setDisplayName(newDisplayName: String?, server: String): Promise<Unit, Exception> {
Log.d("Loki", "Updating display name on server: $server.")
val parameters = mapOf( "name" to (newDisplayName ?: "") )
return execute(HTTPVerb.PATCH, server, "users/me", parameters = parameters).map { Unit }
}
@JvmStatic
fun setProfilePicture(server: String, profileKey: ByteArray, url: String?): Promise<Unit, Exception> {
return setProfilePicture(server, Base64.encodeBytes(profileKey), url)
}

View File

@@ -2,9 +2,9 @@ package org.session.libsession.messaging.opengroups
import org.session.libsession.messaging.MessagingConfiguration
import org.session.libsession.messaging.messages.visible.VisibleMessage
import org.session.libsignal.utilities.logging.Log
import org.session.libsignal.utilities.Hex
import org.session.libsignal.service.loki.utilities.removing05PrefixIfNeeded
import org.session.libsignal.utilities.Hex
import org.session.libsignal.utilities.logging.Log
import org.whispersystems.curve25519.Curve25519
data class OpenGroupMessage(
@@ -26,6 +26,7 @@ data class OpenGroupMessage(
fun from(message: VisibleMessage, server: String): OpenGroupMessage? {
val storage = MessagingConfiguration.shared.storage
val userPublicKey = storage.getUserPublicKey() ?: return null
val attachmentIDs = message.attachmentIDs
// Validation
if (!message.isValid()) { return null } // Should be valid at this point
// Quote
@@ -41,7 +42,8 @@ data class OpenGroupMessage(
}()
// Message
val displayname = storage.getUserDisplayName() ?: "Anonymous"
val body = message.text ?: message.sentTimestamp.toString() // The back-end doesn't accept messages without a body so we use this as a workaround
val text = message.text
val body = if (text.isNullOrEmpty()) message.sentTimestamp.toString() else text // The back-end doesn't accept messages without a body so we use this as a workaround
val result = OpenGroupMessage(null, userPublicKey, displayname, body, message.sentTimestamp!!, OpenGroupAPI.openGroupMessageType, quote, mutableListOf(), null, null, 0)
// Link preview
val linkPreview = message.linkPreview

View File

@@ -4,8 +4,10 @@ import org.session.libsession.messaging.MessagingConfiguration
import org.session.libsession.messaging.messages.Message
import org.session.libsession.messaging.messages.control.*
import org.session.libsession.messaging.messages.visible.VisibleMessage
import org.session.libsession.utilities.GroupUtil
import org.session.libsignal.service.internal.push.PushTransportDetails
import org.session.libsignal.service.internal.push.SignalServiceProtos
import org.session.libsignal.utilities.logging.Log
object MessageReceiver {
@@ -50,8 +52,7 @@ object MessageReceiver {
// If the message failed to process the first time around we retry it later (if the error is retryable). In this case the timestamp
// will already be in the database but we don't want to treat the message as a duplicate. The isRetry flag is a simple workaround
// for this issue.
if (storage.isMessageDuplicated(envelope.timestamp, envelope.source) && !isRetry) throw Error.DuplicateMessage
storage.addReceivedMessageTimestamp(envelope.timestamp)
if (storage.isMessageDuplicated(envelope.timestamp, GroupUtil.doubleEncodeGroupID(envelope.source)) && !isRetry) throw Error.DuplicateMessage
// Decrypt the contents
val ciphertext = envelope.content ?: throw Error.NoData
var plaintext: ByteArray? = null
@@ -70,7 +71,7 @@ object MessageReceiver {
}
SignalServiceProtos.Envelope.Type.CLOSED_GROUP_CIPHERTEXT -> {
val hexEncodedGroupPublicKey = envelope.source
if (hexEncodedGroupPublicKey == null || MessagingConfiguration.shared.storage.isClosedGroup(hexEncodedGroupPublicKey)) {
if (hexEncodedGroupPublicKey == null || !MessagingConfiguration.shared.storage.isClosedGroup(hexEncodedGroupPublicKey)) {
throw Error.InvalidGroupPublicKey
}
val encryptionKeyPairs = MessagingConfiguration.shared.storage.getClosedGroupEncryptionKeyPairs(hexEncodedGroupPublicKey)
@@ -94,28 +95,16 @@ object MessageReceiver {
}
groupPublicKey = envelope.source
decrypt()
// try {
// decrypt()
// } catch(error: Exception) {
// val now = System.currentTimeMillis()
// var shouldRequestEncryptionKeyPair = true
// lastEncryptionKeyPairRequest[groupPublicKey!!]?.let {
// shouldRequestEncryptionKeyPair = now - it > 30 * 1000
// }
// if (shouldRequestEncryptionKeyPair) {
// MessageSender.requestEncryptionKeyPair(groupPublicKey)
// lastEncryptionKeyPairRequest[groupPublicKey] = now
// }
// throw error
// }
}
else -> throw Error.UnknownEnvelopeType
}
}
// Don't process the envelope any further if the message has been handled already
if (storage.isMessageDuplicated(envelope.timestamp, sender!!) && !isRetry) throw Error.DuplicateMessage
// Don't process the envelope any further if the sender is blocked
if (isBlock(sender!!)) throw Error.SenderBlocked
// Parse the proto
val proto = SignalServiceProtos.Content.parseFrom(plaintext)
val proto = SignalServiceProtos.Content.parseFrom(PushTransportDetails.getStrippedPaddingMessageBody(plaintext))
// Parse the message
val message: Message = ReadReceipt.fromProto(proto) ?:
TypingIndicator.fromProto(proto) ?:
@@ -132,12 +121,13 @@ object MessageReceiver {
message.sender = sender
message.recipient = userPublicKey
message.sentTimestamp = envelope.timestamp
message.receivedTimestamp = System.currentTimeMillis()
message.receivedTimestamp = if (envelope.hasServerTimestamp()) envelope.serverTimestamp else System.currentTimeMillis()
Log.d("Loki", "time: ${envelope.timestamp}, sent: ${envelope.serverTimestamp}")
message.groupPublicKey = groupPublicKey
message.openGroupServerMessageID = openGroupServerID
// Validate
var isValid = message.isValid()
if (message is VisibleMessage && !isValid && proto.dataMessage.attachmentsCount == 0) { isValid = true }
if (message is VisibleMessage && !isValid && proto.dataMessage.attachmentsCount != 0) { isValid = true }
if (!isValid) { throw Error.InvalidMessage }
// Return
return Pair(message, proto)

View File

@@ -19,15 +19,17 @@ import org.session.libsession.messaging.threads.recipients.Recipient
import org.session.libsession.utilities.GroupUtil
import org.session.libsession.utilities.SSKEnvironment
import org.session.libsession.utilities.TextSecurePreferences
import org.session.libsession.utilities.preferences.ProfileKeyUtil
import org.session.libsignal.libsignal.ecc.DjbECPrivateKey
import org.session.libsignal.libsignal.ecc.DjbECPublicKey
import org.session.libsignal.libsignal.ecc.ECKeyPair
import org.session.libsignal.utilities.logging.Log
import org.session.libsignal.libsignal.util.guava.Optional
import org.session.libsignal.service.api.messages.SignalServiceGroup
import org.session.libsignal.service.internal.push.SignalServiceProtos
import org.session.libsignal.service.loki.utilities.removing05PrefixIfNeeded
import org.session.libsignal.service.loki.utilities.toHexString
import org.session.libsignal.utilities.Base64
import org.session.libsignal.utilities.logging.Log
import java.security.MessageDigest
import java.util.*
import kotlin.collections.ArrayList
@@ -43,7 +45,7 @@ fun MessageReceiver.handle(message: Message, proto: SignalServiceProtos.Content,
is ReadReceipt -> handleReadReceipt(message)
is TypingIndicator -> handleTypingIndicator(message)
is ClosedGroupControlMessage -> handleClosedGroupControlMessage(message)
is ExpirationTimerUpdate -> handleExpirationTimerUpdate(message, proto)
is ExpirationTimerUpdate -> handleExpirationTimerUpdate(message)
is DataExtractionNotification -> handleDataExtractionNotification(message)
is ConfigurationMessage -> handleConfigurationMessage(message)
is VisibleMessage -> handleVisibleMessage(message, proto, openGroupID)
@@ -83,27 +85,14 @@ fun MessageReceiver.cancelTypingIndicatorsIfNeeded(senderPublicKey: String) {
SSKEnvironment.shared.typingIndicators.didReceiveIncomingMessage(context, threadID, address, 1)
}
private fun MessageReceiver.handleExpirationTimerUpdate(message: ExpirationTimerUpdate, proto: SignalServiceProtos.Content) {
private fun MessageReceiver.handleExpirationTimerUpdate(message: ExpirationTimerUpdate) {
if (message.duration!! > 0) {
setExpirationTimer(message, proto)
SSKEnvironment.shared.messageExpirationManager.setExpirationTimer(message)
} else {
disableExpirationTimer(message, proto)
SSKEnvironment.shared.messageExpirationManager.disableExpirationTimer(message)
}
}
fun MessageReceiver.setExpirationTimer(message: ExpirationTimerUpdate, proto: SignalServiceProtos.Content) {
val id = message.id
val duration = message.duration!!
val senderPublicKey = message.sender!!
SSKEnvironment.shared.messageExpirationManager.setExpirationTimer(id, duration, senderPublicKey, proto)
}
fun MessageReceiver.disableExpirationTimer(message: ExpirationTimerUpdate, proto: SignalServiceProtos.Content) {
val id = message.id
val senderPublicKey = message.sender!!
SSKEnvironment.shared.messageExpirationManager.disableExpirationTimer(id, senderPublicKey, proto)
}
// Data Extraction Notification handling
private fun MessageReceiver.handleDataExtractionNotification(message: DataExtractionNotification) {
@@ -122,8 +111,11 @@ private fun MessageReceiver.handleDataExtractionNotification(message: DataExtrac
private fun MessageReceiver.handleConfigurationMessage(message: ConfigurationMessage) {
val context = MessagingConfiguration.shared.context
val storage = MessagingConfiguration.shared.storage
if (TextSecurePreferences.getConfigurationMessageSynced(context)) return
if (message.sender != storage.getUserPublicKey()) return
if (TextSecurePreferences.getConfigurationMessageSynced(context) && !TextSecurePreferences.shouldUpdateProfile(context, message.sentTimestamp!!)) return
val userPublicKey = storage.getUserPublicKey()
if (userPublicKey == null || message.sender != storage.getUserPublicKey()) return
TextSecurePreferences.setConfigurationMessageSynced(context, true)
TextSecurePreferences.setLastProfileUpdateTime(context, message.sentTimestamp!!)
val allClosedGroupPublicKeys = storage.getAllClosedGroupPublicKeys()
for (closeGroup in message.closedGroups) {
if (allClosedGroupPublicKeys.contains(closeGroup.publicKey)) continue
@@ -134,25 +126,25 @@ private fun MessageReceiver.handleConfigurationMessage(message: ConfigurationMes
if (allOpenGroups.contains(openGroup)) continue
storage.addOpenGroup(openGroup, 1)
}
// TODO: in future handle the latest in config messages
TextSecurePreferences.setConfigurationMessageSynced(context, true)
if (message.displayName.isNotEmpty()) {
TextSecurePreferences.setProfileName(context, message.displayName)
storage.setDisplayName(userPublicKey, message.displayName)
}
if (message.profileKey.isNotEmpty()) {
val profileKey = Base64.encodeBytes(message.profileKey)
ProfileKeyUtil.setEncodedProfileKey(context, profileKey)
storage.setProfileKeyForRecipient(userPublicKey, message.profileKey)
// handle profile photo
if (!message.profilePicture.isNullOrEmpty() && TextSecurePreferences.getProfilePictureURL(context) != message.profilePicture) {
storage.setUserProfilePictureUrl(message.profilePicture!!)
}
}
storage.addContacts(message.contacts)
}
fun MessageReceiver.handleVisibleMessage(message: VisibleMessage, proto: SignalServiceProtos.Content, openGroupID: String?) {
val storage = MessagingConfiguration.shared.storage
val context = MessagingConfiguration.shared.context
// Parse & persist attachments
val attachments = proto.dataMessage.attachmentsList.mapNotNull { proto ->
val attachment = Attachment.fromProto(proto)
if (attachment == null || !attachment.isValid()) {
return@mapNotNull null
} else {
return@mapNotNull attachment
}
}
val attachmentIDs = storage.persistAttachments(message.id ?: 0, attachments)
message.attachmentIDs = attachmentIDs as ArrayList<Long>
var attachmentsToDownload = attachmentIDs
// Update profile if needed
val newProfile = message.profile
if (newProfile != null) {
@@ -160,11 +152,13 @@ fun MessageReceiver.handleVisibleMessage(message: VisibleMessage, proto: SignalS
val recipient = Recipient.from(context, Address.fromSerialized(message.sender!!), false)
val displayName = newProfile.displayName!!
val userPublicKey = storage.getUserPublicKey()
if (userPublicKey == message.sender) {
// Update the user's local name if the message came from their master device
TextSecurePreferences.setProfileName(context, displayName)
if (openGroupID == null) {
if (userPublicKey == message.sender) {
// Update the user's local name if the message came from their master device
TextSecurePreferences.setProfileName(context, displayName)
}
profileManager.setDisplayName(context, recipient, displayName)
}
profileManager.setDisplayName(context, recipient, displayName)
if (recipient.profileKey == null || !MessageDigest.isEqual(recipient.profileKey, newProfile.profileKey)) {
profileManager.setProfileKey(context, recipient, newProfile.profileKey!!)
profileManager.setUnidentifiedAccessMode(context, recipient, Recipient.UnidentifiedAccessMode.UNKNOWN)
@@ -182,10 +176,10 @@ fun MessageReceiver.handleVisibleMessage(message: VisibleMessage, proto: SignalS
if (message.quote != null && proto.dataMessage.hasQuote()) {
val quote = proto.dataMessage.quote
val author = Address.fromSerialized(quote.author)
val messageID = MessagingConfiguration.shared.messageDataProvider.getMessageForQuote(quote.id, author)
if (messageID != null) {
val attachmentsWithLinkPreview = MessagingConfiguration.shared.messageDataProvider.getAttachmentsAndLinkPreviewFor(messageID)
quoteModel = QuoteModel(quote.id, author, MessagingConfiguration.shared.messageDataProvider.getMessageBodyFor(messageID), false, attachmentsWithLinkPreview)
val messageInfo = MessagingConfiguration.shared.messageDataProvider.getMessageForQuote(quote.id, author)
if (messageInfo != null) {
val attachments = if (messageInfo.second) MessagingConfiguration.shared.messageDataProvider.getAttachmentsAndLinkPreviewFor(messageInfo.first) else ArrayList()
quoteModel = QuoteModel(quote.id, author, MessagingConfiguration.shared.messageDataProvider.getMessageBodyFor(quote.id, quote.author), false, attachments)
} else {
quoteModel = QuoteModel(quote.id, author, quote.text, true, PointerAttachment.forPointers(proto.dataMessage.quote.attachmentsList))
}
@@ -206,14 +200,25 @@ fun MessageReceiver.handleVisibleMessage(message: VisibleMessage, proto: SignalS
}
}
}
val attachments = proto.dataMessage.attachmentsList.mapNotNull { proto ->
val attachment = Attachment.fromProto(proto)
if (!attachment.isValid()) {
return@mapNotNull null
} else {
return@mapNotNull attachment
}
}
// Parse stickers if needed
// Persist the message
val messageID = storage.persist(message, quoteModel, linkPreviews, message.groupPublicKey, openGroupID) ?: throw MessageReceiver.Error.NoThread
message.threadID = threadID
val messageID = storage.persist(message, quoteModel, linkPreviews, message.groupPublicKey, openGroupID, attachments) ?: throw MessageReceiver.Error.NoThread
// Parse & persist attachments
// Start attachment downloads if needed
attachmentsToDownload.forEach { attachmentID ->
val downloadJob = AttachmentDownloadJob(attachmentID, messageID)
JobQueue.shared.add(downloadJob)
storage.getAttachmentsForMessage(messageID).forEach { attachment ->
attachment.attachmentId?.let { id ->
val downloadJob = AttachmentDownloadJob(id.rowId, messageID)
JobQueue.shared.add(downloadJob)
}
}
// Cancel any typing indicators if needed
cancelTypingIndicatorsIfNeeded(message.sender!!)
@@ -283,6 +288,10 @@ private fun MessageReceiver.handleClosedGroupUpdated(message: ClosedGroupControl
Log.d("Loki", "Ignoring closed group info message for nonexistent group.")
return
}
if (!group.isActive) {
Log.d("Loki", "Ignoring closed group info message for inactive group")
return
}
val oldMembers = group.members.map { it.serialize() }
// Check common group update logic
if (!isValidGroupUpdate(group, message.sentTimestamp!!, senderPublicKey)) {
@@ -331,12 +340,16 @@ private fun MessageReceiver.handleClosedGroupEncryptionKeyPair(message: ClosedGr
Log.d("Loki", "Ignoring closed group info message for nonexistent group.")
return
}
if (!group.members.map { it.toString() }.contains(senderPublicKey)) {
if (!group.isActive) {
Log.d("Loki", "Ignoring closed group info message for inactive group")
return
}
if (!group.admins.map { it.toString() }.contains(senderPublicKey)) {
Log.d("Loki", "Ignoring closed group encryption key pair from non-member.")
return
}
// Find our wrapper and decrypt it if possible
val wrapper = kind.wrappers.firstOrNull { it.publicKey!!.toByteArray().toHexString() == userPublicKey } ?: return
val wrapper = kind.wrappers.firstOrNull { it.publicKey!! == userPublicKey } ?: return
val encryptedKeyPair = wrapper.encryptedKeyPair!!.toByteArray()
val plaintext = MessageReceiverDecryption.decryptWithSessionProtocol(encryptedKeyPair, userKeyPair).first
// Parse it
@@ -355,6 +368,7 @@ private fun MessageReceiver.handleClosedGroupEncryptionKeyPair(message: ClosedGr
private fun MessageReceiver.handleClosedGroupNameChanged(message: ClosedGroupControlMessage) {
val context = MessagingConfiguration.shared.context
val storage = MessagingConfiguration.shared.storage
val userPublicKey = TextSecurePreferences.getLocalNumber(context)
val senderPublicKey = message.sender ?: return
val kind = message.kind!! as? ClosedGroupControlMessage.Kind.NameChange ?: return
val groupPublicKey = message.groupPublicKey ?: return
@@ -364,6 +378,10 @@ private fun MessageReceiver.handleClosedGroupNameChanged(message: ClosedGroupCon
Log.d("Loki", "Ignoring closed group info message for nonexistent group.")
return
}
if (!group.isActive) {
Log.d("Loki", "Ignoring closed group info message for inactive group")
return
}
// Check common group update logic
if (!isValidGroupUpdate(group, message.sentTimestamp!!, senderPublicKey)) {
return
@@ -373,7 +391,14 @@ private fun MessageReceiver.handleClosedGroupNameChanged(message: ClosedGroupCon
val name = kind.name
storage.updateTitle(groupID, name)
storage.insertIncomingInfoMessage(context, senderPublicKey, groupID, SignalServiceProtos.GroupContext.Type.UPDATE, SignalServiceGroup.Type.UPDATE, name, members, admins, message.sentTimestamp!!)
// Notify the user
if (userPublicKey == senderPublicKey) {
// sender is a linked device
val threadID = storage.getOrCreateThreadIdFor(Address.fromSerialized(groupID))
storage.insertOutgoingInfoMessage(context, groupID, SignalServiceProtos.GroupContext.Type.UPDATE, name, members, admins, threadID, message.sentTimestamp!!)
} else {
storage.insertIncomingInfoMessage(context, senderPublicKey, groupID, SignalServiceProtos.GroupContext.Type.UPDATE, SignalServiceGroup.Type.UPDATE, name, members, admins, message.sentTimestamp!!)
}
}
private fun MessageReceiver.handleClosedGroupMembersAdded(message: ClosedGroupControlMessage) {
@@ -388,6 +413,10 @@ private fun MessageReceiver.handleClosedGroupMembersAdded(message: ClosedGroupCo
Log.d("Loki", "Ignoring closed group info message for nonexistent group.")
return
}
if (!group.isActive) {
Log.d("Loki", "Ignoring closed group info message for inactive group")
return
}
if (!isValidGroupUpdate(group, message.sentTimestamp!!, senderPublicKey)) { return }
val name = group.title
// Check common group update logic
@@ -397,7 +426,9 @@ private fun MessageReceiver.handleClosedGroupMembersAdded(message: ClosedGroupCo
val updateMembers = kind.members.map { it.toByteArray().toHexString() }
val newMembers = members + updateMembers
storage.updateMembers(groupID, newMembers.map { Address.fromSerialized(it) })
// Notify the user
if (userPublicKey == senderPublicKey) {
// sender is a linked device
val threadID = storage.getOrCreateThreadIdFor(Address.fromSerialized(groupID))
storage.insertOutgoingInfoMessage(context, groupID, SignalServiceProtos.GroupContext.Type.UPDATE, name, members, admins, threadID, message.sentTimestamp!!)
} else {
@@ -415,7 +446,6 @@ private fun MessageReceiver.handleClosedGroupMembersAdded(message: ClosedGroupCo
}
}
}
storage.insertIncomingInfoMessage(context, senderPublicKey, groupID, SignalServiceProtos.GroupContext.Type.UPDATE, SignalServiceGroup.Type.UPDATE, name, members, admins, message.sentTimestamp!!)
}
private fun MessageReceiver.handleClosedGroupMembersRemoved(message: ClosedGroupControlMessage) {
@@ -430,6 +460,10 @@ private fun MessageReceiver.handleClosedGroupMembersRemoved(message: ClosedGroup
Log.d("Loki", "Ignoring closed group info message for nonexistent group.")
return
}
if (!group.isActive) {
Log.d("Loki", "Ignoring closed group info message for inactive group")
return
}
val name = group.title
// Check common group update logic
val members = group.members.map { it.serialize() }
@@ -464,7 +498,14 @@ private fun MessageReceiver.handleClosedGroupMembersRemoved(message: ClosedGroup
if (senderLeft) SignalServiceProtos.GroupContext.Type.QUIT to SignalServiceGroup.Type.QUIT
else SignalServiceProtos.GroupContext.Type.UPDATE to SignalServiceGroup.Type.UPDATE
storage.insertIncomingInfoMessage(context, senderPublicKey, groupID, contextType, signalType, name, members, admins, message.sentTimestamp!!)
// Notify the user
if (userPublicKey == senderPublicKey) {
// sender is a linked device
val threadID = storage.getOrCreateThreadIdFor(Address.fromSerialized(groupID))
storage.insertOutgoingInfoMessage(context, groupID, contextType, name, members, admins, threadID, message.sentTimestamp!!)
} else {
storage.insertIncomingInfoMessage(context, senderPublicKey, groupID, contextType, signalType, name, members, admins, message.sentTimestamp!!)
}
}
private fun MessageReceiver.handleClosedGroupMemberLeft(message: ClosedGroupControlMessage) {
@@ -479,6 +520,10 @@ private fun MessageReceiver.handleClosedGroupMemberLeft(message: ClosedGroupCont
Log.d("Loki", "Ignoring closed group info message for nonexistent group.")
return
}
if (!group.isActive) {
Log.d("Loki", "Ignoring closed group info message for inactive group")
return
}
val name = group.title
// Check common group update logic
val members = group.members.map { it.serialize() }
@@ -489,8 +534,10 @@ private fun MessageReceiver.handleClosedGroupMemberLeft(message: ClosedGroupCont
// If admin leaves the group is disbanded
val didAdminLeave = admins.contains(senderPublicKey)
val updatedMemberList = members - senderPublicKey
val userLeft = (userPublicKey == senderPublicKey)
if (didAdminLeave) {
if (didAdminLeave || userLeft) {
// admin left the group of linked device left the group
disableLocalGroupAndUnsubscribe(groupPublicKey, groupID, userPublicKey)
} else {
val isCurrentUserAdmin = admins.contains(userPublicKey)
@@ -499,7 +546,14 @@ private fun MessageReceiver.handleClosedGroupMemberLeft(message: ClosedGroupCont
MessageSender.generateAndSendNewEncryptionKeyPair(groupPublicKey, updatedMemberList)
}
}
storage.insertIncomingInfoMessage(context, senderPublicKey, groupID, SignalServiceProtos.GroupContext.Type.QUIT, SignalServiceGroup.Type.QUIT, name, members, admins, message.sentTimestamp!!)
// Notify the user
if (userLeft) {
//sender is a linked device
val threadID = storage.getOrCreateThreadIdFor(Address.fromSerialized(groupID))
storage.insertOutgoingInfoMessage(context, groupID, SignalServiceProtos.GroupContext.Type.QUIT, name, members, admins, threadID, message.sentTimestamp!!)
} else {
storage.insertIncomingInfoMessage(context, senderPublicKey, groupID, SignalServiceProtos.GroupContext.Type.QUIT, SignalServiceGroup.Type.QUIT, name, members, admins, message.sentTimestamp!!)
}
}
private fun MessageReceiver.handleClosedGroupEncryptionKeyPairRequest(message: ClosedGroupControlMessage) {

View File

@@ -153,10 +153,9 @@ object MessageSender {
}
val recipient = message.recipient!!
val base64EncodedData = Base64.encodeBytes(wrappedMessage)
val timestamp = System.currentTimeMillis()
val nonce = ProofOfWork.calculate(base64EncodedData, recipient, timestamp, message.ttl.toInt()) ?: throw Error.ProofOfWorkCalculationFailed
val nonce = ProofOfWork.calculate(base64EncodedData, recipient, message.sentTimestamp!!, message.ttl.toInt()) ?: throw Error.ProofOfWorkCalculationFailed
// Send the result
val snodeMessage = SnodeMessage(recipient, base64EncodedData, message.ttl, timestamp, nonce)
val snodeMessage = SnodeMessage(recipient, base64EncodedData, message.ttl, message.sentTimestamp!!, nonce)
if (destination is Destination.Contact && message is VisibleMessage && !isSelfSend) {
SnodeConfiguration.shared.broadcaster.broadcast("sendingMessage", message.sentTimestamp!!)
}
@@ -179,8 +178,8 @@ object MessageSender {
if (shouldNotify) {
val notifyPNServerJob = NotifyPNServerJob(snodeMessage)
JobQueue.shared.add(notifyPNServerJob)
deferred.resolve(Unit)
}
deferred.resolve(Unit)
}
promise.fail {
errorCount += 1
@@ -337,7 +336,7 @@ object MessageSender {
}
@JvmStatic
fun explicitLeave(groupPublicKey: String): Promise<Unit, Exception> {
return leave(groupPublicKey)
fun explicitLeave(groupPublicKey: String, notifyUser: Boolean): Promise<Unit, Exception> {
return leave(groupPublicKey, notifyUser)
}
}

View File

@@ -5,22 +5,20 @@ package org.session.libsession.messaging.sending_receiving
import com.google.protobuf.ByteString
import nl.komponents.kovenant.Promise
import nl.komponents.kovenant.deferred
import org.session.libsession.messaging.MessagingConfiguration
import org.session.libsession.messaging.messages.control.ClosedGroupControlMessage
import org.session.libsession.messaging.sending_receiving.notifications.PushNotificationAPI
import org.session.libsession.messaging.sending_receiving.MessageSender.Error
import org.session.libsession.messaging.sending_receiving.notifications.PushNotificationAPI
import org.session.libsession.messaging.threads.Address
import org.session.libsession.utilities.GroupUtil
import org.session.libsession.utilities.TextSecurePreferences
import org.session.libsignal.utilities.Hex
import org.session.libsignal.libsignal.ecc.Curve
import org.session.libsignal.libsignal.ecc.ECKeyPair
import org.session.libsignal.libsignal.util.guava.Optional
import org.session.libsignal.service.internal.push.SignalServiceProtos
import org.session.libsignal.service.loki.utilities.hexEncodedPublicKey
import org.session.libsignal.service.loki.utilities.removing05PrefixIfNeeded
import org.session.libsignal.utilities.Hex
import org.session.libsignal.utilities.ThreadUtils
import org.session.libsignal.utilities.logging.Log
import java.util.*
@@ -211,6 +209,7 @@ fun MessageSender.leave(groupPublicKey: String, notifyUser: Boolean = true): Pro
val closedGroupControlMessage = ClosedGroupControlMessage(ClosedGroupControlMessage.Kind.MemberLeft())
val sentTime = System.currentTimeMillis()
closedGroupControlMessage.sentTimestamp = sentTime
storage.setActive(groupID, false)
sendNonDurably(closedGroupControlMessage, Address.fromSerialized(groupID)).success {
// Notify the user
val infoType = SignalServiceProtos.GroupContext.Type.QUIT
@@ -221,6 +220,8 @@ fun MessageSender.leave(groupPublicKey: String, notifyUser: Boolean = true): Pro
// Remove the group private key and unsubscribe from PNs
MessageReceiver.disableLocalGroupAndUnsubscribe(groupPublicKey, groupID, userPublicKey)
deferred.resolve(Unit)
}.fail {
storage.setActive(groupID, true)
}
}
return deferred.promise
@@ -291,4 +292,32 @@ fun MessageSender.requestEncryptionKeyPair(groupPublicKey: String) {
val closedGroupControlMessage = ClosedGroupControlMessage(ClosedGroupControlMessage.Kind.EncryptionKeyPairRequest())
closedGroupControlMessage.sentTimestamp = sentTime
send(closedGroupControlMessage, Address.fromSerialized(groupID))
}
fun MessageSender.sendLatestEncryptionKeyPair(publicKey: String, groupPublicKey: String) {
val storage = MessagingConfiguration.shared.storage
val groupID = GroupUtil.doubleEncodeGroupID(groupPublicKey)
val group = storage.getGroup(groupID) ?: run {
Log.d("Loki", "Can't send encryption key pair for nonexistent closed group.")
throw Error.NoThread
}
val members = group.members.map { it.serialize() }
if (!members.contains(publicKey)) {
Log.d("Loki", "Refusing to send latest encryption key pair to non-member.")
return
}
// Get the latest encryption key pair
val encryptionKeyPair = pendingKeyPair[groupPublicKey]?.orNull()
?: storage.getLatestClosedGroupEncryptionKeyPair(groupPublicKey) ?: return
// Send it
val proto = SignalServiceProtos.KeyPair.newBuilder()
proto.publicKey = ByteString.copyFrom(encryptionKeyPair.publicKey.serialize().removing05PrefixIfNeeded())
proto.privateKey = ByteString.copyFrom(encryptionKeyPair.privateKey.serialize())
val plaintext = proto.build().toByteArray()
val ciphertext = MessageSenderEncryption.encryptWithSessionProtocol(plaintext, publicKey)
Log.d("Loki", "Sending latest encryption key pair to: $publicKey.")
val wrapper = ClosedGroupControlMessage.KeyPairWrapper(publicKey, ByteString.copyFrom(ciphertext))
val kind = ClosedGroupControlMessage.Kind.EncryptionKeyPair(ByteString.copyFrom(Hex.fromStringCondensed(groupPublicKey)), listOf(wrapper))
val closedGroupControlMessage = ClosedGroupControlMessage(kind)
MessageSender.send(closedGroupControlMessage, Address.fromSerialized(publicKey))
}

View File

@@ -169,4 +169,25 @@ public class PointerAttachment extends Attachment {
thumbnail != null ? thumbnail.asPointer().getCaption().orNull() : null,
thumbnail != null ? thumbnail.asPointer().getUrl() : ""));
}
/**
* Converts a Session Attachment to a Signal Attachment
* @param attachment Session Attachment
* @return Signal Attachment
*/
public static Attachment forAttachment(org.session.libsession.messaging.messages.visible.Attachment attachment) {
return new PointerAttachment(attachment.getContentType(),
AttachmentTransferProgress.TRANSFER_PROGRESS_PENDING,
attachment.getSizeInBytes(),
attachment.getFileName(),
null, Base64.encodeBytes(attachment.getKey()),
null,
attachment.getDigest(),
null,
attachment.getKind() == org.session.libsession.messaging.messages.visible.Attachment.Kind.VOICE_MESSAGE,
attachment.getSize() != null ? attachment.getSize().getWidth() : 0,
attachment.getSize() != null ? attachment.getSize().getHeight() : 0,
attachment.getCaption(),
attachment.getUrl());
}
}

View File

@@ -4,17 +4,15 @@ import android.os.Handler
import nl.komponents.kovenant.Promise
import nl.komponents.kovenant.functional.bind
import nl.komponents.kovenant.functional.map
import org.session.libsession.messaging.MessagingConfiguration
import org.session.libsession.messaging.jobs.JobQueue
import org.session.libsession.messaging.jobs.MessageReceiveJob
import org.session.libsession.messaging.utilities.MessageWrapper
import org.session.libsession.snode.SnodeAPI
import org.session.libsignal.utilities.successBackground
import org.session.libsignal.utilities.logging.Log
import org.session.libsignal.utilities.Base64
import org.session.libsignal.service.loki.utilities.getRandomElementOrNull
import org.session.libsignal.utilities.Base64
import org.session.libsignal.utilities.logging.Log
import org.session.libsignal.utilities.successBackground
class ClosedGroupPoller {
private var isPolling = false
@@ -24,7 +22,7 @@ class ClosedGroupPoller {
override fun run() {
poll()
handler.postDelayed(this, ClosedGroupPoller.pollInterval)
handler.postDelayed(this, pollInterval)
}
}
@@ -61,7 +59,7 @@ class ClosedGroupPoller {
// region Private API
private fun poll(): List<Promise<Unit, Exception>> {
if (!isPolling) { return listOf() }
val publicKeys = MessagingConfiguration.shared.storage.getAllClosedGroupPublicKeys()
val publicKeys = MessagingConfiguration.shared.storage.getAllActiveClosedGroupPublicKeys()
return publicKeys.map { publicKey ->
val promise = SnodeAPI.getSwarm(publicKey).bind { swarm ->
val snode = swarm.getRandomElementOrNull() ?: throw InsufficientSnodesException() // Should be cryptographically secure
@@ -69,6 +67,10 @@ class ClosedGroupPoller {
SnodeAPI.getRawMessages(snode, publicKey).map {SnodeAPI.parseRawMessagesResponse(it, snode, publicKey) }
}
promise.successBackground { messages ->
if (!MessagingConfiguration.shared.storage.isGroupActive(publicKey)) {
// ignore inactive group's messages
return@successBackground
}
if (messages.isNotEmpty()) {
Log.d("Loki", "Received ${messages.count()} new message(s) in closed group with public key: $publicKey.")
}

View File

@@ -1,8 +1,6 @@
package org.session.libsession.messaging.sending_receiving.pollers
import android.os.Handler
import com.google.protobuf.ByteString
import nl.komponents.kovenant.Promise
import nl.komponents.kovenant.deferred
import org.session.libsession.messaging.MessagingConfiguration
@@ -11,61 +9,30 @@ import org.session.libsession.messaging.jobs.MessageReceiveJob
import org.session.libsession.messaging.opengroups.OpenGroup
import org.session.libsession.messaging.opengroups.OpenGroupAPI
import org.session.libsession.messaging.opengroups.OpenGroupMessage
import org.session.libsignal.utilities.successBackground
import org.session.libsignal.utilities.logging.Log
import org.session.libsignal.service.internal.push.SignalServiceProtos.*
import org.session.libsignal.utilities.logging.Log
import org.session.libsignal.utilities.successBackground
import java.util.*
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.ScheduledFuture
import java.util.concurrent.TimeUnit
class OpenGroupPoller(private val openGroup: OpenGroup, private val executorService: ScheduledExecutorService? = null) {
class OpenGroupPoller(private val openGroup: OpenGroup) {
private val handler by lazy { Handler() }
private var hasStarted = false
private var isPollOngoing = false
public var isCaughtUp = false
@Volatile private var isPollOngoing = false
var isCaughtUp = false
private val cancellableFutures = mutableListOf<ScheduledFuture<out Any>>()
// region Convenience
private val userHexEncodedPublicKey = MessagingConfiguration.shared.storage.getUserPublicKey() ?: ""
private var displayNameUpdatees = setOf<String>()
// endregion
// region Tasks
private val pollForNewMessagesTask = object : Runnable {
override fun run() {
pollForNewMessages()
handler.postDelayed(this, pollForNewMessagesInterval)
}
}
private val pollForDeletedMessagesTask = object : Runnable {
override fun run() {
pollForDeletedMessages()
handler.postDelayed(this, pollForDeletedMessagesInterval)
}
}
private val pollForModeratorsTask = object : Runnable {
override fun run() {
pollForModerators()
handler.postDelayed(this, pollForModeratorsInterval)
}
}
private val pollForDisplayNamesTask = object : Runnable {
override fun run() {
pollForDisplayNames()
handler.postDelayed(this, pollForDisplayNamesInterval)
}
}
private var displayNameUpdates = setOf<String>()
// endregion
// region Settings
companion object {
private val pollForNewMessagesInterval: Long = 4 * 1000
private val pollForNewMessagesInterval: Long = 10 * 1000
private val pollForDeletedMessagesInterval: Long = 60 * 1000
private val pollForModeratorsInterval: Long = 10 * 60 * 1000
private val pollForDisplayNamesInterval: Long = 60 * 1000
@@ -74,19 +41,21 @@ class OpenGroupPoller(private val openGroup: OpenGroup) {
// region Lifecycle
fun startIfNeeded() {
if (hasStarted) return
pollForNewMessagesTask.run()
pollForDeletedMessagesTask.run()
pollForModeratorsTask.run()
pollForDisplayNamesTask.run()
if (hasStarted || executorService == null) return
cancellableFutures += listOf(
executorService.scheduleAtFixedRate(::pollForNewMessages,0, pollForNewMessagesInterval, TimeUnit.MILLISECONDS),
executorService.scheduleAtFixedRate(::pollForDeletedMessages,0, pollForDeletedMessagesInterval, TimeUnit.MILLISECONDS),
executorService.scheduleAtFixedRate(::pollForModerators,0, pollForModeratorsInterval, TimeUnit.MILLISECONDS),
executorService.scheduleAtFixedRate(::pollForDisplayNames,0, pollForDisplayNamesInterval, TimeUnit.MILLISECONDS)
)
hasStarted = true
}
fun stop() {
handler.removeCallbacks(pollForNewMessagesTask)
handler.removeCallbacks(pollForDeletedMessagesTask)
handler.removeCallbacks(pollForModeratorsTask)
handler.removeCallbacks(pollForDisplayNamesTask)
cancellableFutures.forEach { future ->
future.cancel(false)
}
cancellableFutures.clear()
hasStarted = false
}
// endregion
@@ -96,120 +65,129 @@ class OpenGroupPoller(private val openGroup: OpenGroup) {
return pollForNewMessages(false)
}
fun pollForNewMessages(isBackgroundPoll: Boolean): Promise<Unit, Exception> {
private fun pollForNewMessages(isBackgroundPoll: Boolean): Promise<Unit, Exception> {
if (isPollOngoing) { return Promise.of(Unit) }
isPollOngoing = true
val deferred = deferred<Unit, Exception>()
// Kovenant propagates a context to chained promises, so OpenGroupAPI.sharedContext should be used for all of the below
OpenGroupAPI.getMessages(openGroup.channel, openGroup.server).successBackground { messages ->
// Process messages in the background
Log.d("Loki", "received ${messages.size} messages")
messages.forEach { message ->
val senderPublicKey = message.senderPublicKey
val wasSentByCurrentUser = (senderPublicKey == userHexEncodedPublicKey)
fun generateDisplayName(rawDisplayName: String): String {
return "${rawDisplayName} (${senderPublicKey.takeLast(8)})"
}
val senderDisplayName = MessagingConfiguration.shared.storage.getOpenGroupDisplayName(senderPublicKey, openGroup.channel, openGroup.server) ?: generateDisplayName("Anonymous")
val id = openGroup.id.toByteArray()
// Main message
val dataMessageProto = DataMessage.newBuilder()
val body = if (message.body == message.timestamp.toString()) { "" } else { message.body }
dataMessageProto.setBody(body)
dataMessageProto.setTimestamp(message.timestamp)
// Attachments
val attachmentProtos = message.attachments.mapNotNull { attachment ->
if (attachment.kind != OpenGroupMessage.Attachment.Kind.Attachment) { return@mapNotNull null }
val attachmentProto = AttachmentPointer.newBuilder()
attachmentProto.setId(attachment.serverID)
attachmentProto.setContentType(attachment.contentType)
attachmentProto.setSize(attachment.size)
attachmentProto.setFileName(attachment.fileName)
attachmentProto.setFlags(attachment.flags)
attachmentProto.setWidth(attachment.width)
attachmentProto.setHeight(attachment.height)
attachment.caption.let { attachmentProto.setCaption(it) }
attachmentProto.setUrl(attachment.url)
attachmentProto.build()
}
dataMessageProto.addAllAttachments(attachmentProtos)
// Link preview
val linkPreview = message.attachments.firstOrNull { it.kind == OpenGroupMessage.Attachment.Kind.LinkPreview }
if (linkPreview != null) {
val linkPreviewProto = DataMessage.Preview.newBuilder()
linkPreviewProto.setUrl(linkPreview.linkPreviewURL!!)
linkPreviewProto.setTitle(linkPreview.linkPreviewTitle!!)
val attachmentProto = AttachmentPointer.newBuilder()
attachmentProto.setId(linkPreview.serverID)
attachmentProto.setContentType(linkPreview.contentType)
attachmentProto.setSize(linkPreview.size)
attachmentProto.setFileName(linkPreview.fileName)
attachmentProto.setFlags(linkPreview.flags)
attachmentProto.setWidth(linkPreview.width)
attachmentProto.setHeight(linkPreview.height)
linkPreview.caption.let { attachmentProto.setCaption(it) }
attachmentProto.setUrl(linkPreview.url)
linkPreviewProto.setImage(attachmentProto.build())
dataMessageProto.addPreview(linkPreviewProto.build())
}
// Quote
val quote = message.quote
if (quote != null) {
val quoteProto = DataMessage.Quote.newBuilder()
quoteProto.setId(quote.quotedMessageTimestamp)
quoteProto.setAuthor(quote.quoteePublicKey)
if (quote.quotedMessageBody != quote.quotedMessageTimestamp.toString()) { quoteProto.setText(quote.quotedMessageBody) }
dataMessageProto.setQuote(quoteProto.build())
}
val messageServerID = message.serverID
// Profile
val profileProto = DataMessage.LokiProfile.newBuilder()
profileProto.setDisplayName(message.displayName)
val profilePicture = message.profilePicture
if (profilePicture != null) {
profileProto.setProfilePicture(profilePicture.url)
dataMessageProto.setProfileKey(ByteString.copyFrom(profilePicture.profileKey))
}
dataMessageProto.setProfile(profileProto.build())
/* TODO: the signal service proto needs to be synced with iOS
// Open group info
if (messageServerID != null) {
val openGroupProto = PublicChatInfo.newBuilder()
openGroupProto.setServerID(messageServerID)
dataMessageProto.setPublicChatInfo(openGroupProto.build())
}
*/
// Signal group context
val groupProto = GroupContext.newBuilder()
groupProto.setId(ByteString.copyFrom(id))
groupProto.setType(GroupContext.Type.DELIVER)
groupProto.setName(openGroup.displayName)
dataMessageProto.setGroup(groupProto.build())
// Sync target
if (wasSentByCurrentUser) {
dataMessageProto.setSyncTarget(openGroup.id)
}
// Content
val content = Content.newBuilder()
content.setDataMessage(dataMessageProto.build())
// Envelope
val builder = Envelope.newBuilder()
builder.type = Envelope.Type.UNIDENTIFIED_SENDER
builder.source = senderPublicKey
builder.sourceDevice = 1
builder.setContent(content.build().toByteString())
builder.serverTimestamp = message.serverTimestamp
val envelope = builder.build()
val job = MessageReceiveJob(envelope.toByteArray(), isBackgroundPoll, messageServerID, openGroup.id)
if (isBackgroundPoll) {
job.executeAsync().success { deferred.resolve(Unit) }.fail { deferred.resolve(Unit) }
// The promise is just used to keep track of when we're done
} else {
JobQueue.shared.add(job)
deferred.resolve(Unit)
try {
val senderPublicKey = message.senderPublicKey
fun generateDisplayName(rawDisplayName: String): String {
return "$rawDisplayName (...${senderPublicKey.takeLast(8)})"
}
val senderDisplayName = MessagingConfiguration.shared.storage.getOpenGroupDisplayName(senderPublicKey, openGroup.channel, openGroup.server) ?: generateDisplayName(message.displayName)
val id = openGroup.id.toByteArray()
// Main message
val dataMessageProto = DataMessage.newBuilder()
val body = if (message.body == message.timestamp.toString()) { "" } else { message.body }
dataMessageProto.setBody(body)
dataMessageProto.setTimestamp(message.timestamp)
// Attachments
val attachmentProtos = message.attachments.mapNotNull { attachment ->
try {
if (attachment.kind != OpenGroupMessage.Attachment.Kind.Attachment) { return@mapNotNull null }
val attachmentProto = AttachmentPointer.newBuilder()
attachmentProto.setId(attachment.serverID)
attachmentProto.setContentType(attachment.contentType)
attachmentProto.setSize(attachment.size)
attachmentProto.setFileName(attachment.fileName)
attachmentProto.setFlags(attachment.flags)
attachmentProto.setWidth(attachment.width)
attachmentProto.setHeight(attachment.height)
attachment.caption?.let { attachmentProto.setCaption(it) }
attachmentProto.setUrl(attachment.url)
attachmentProto.build()
} catch (e: Exception) {
Log.e("Loki","Failed to parse attachment as proto",e)
null
}
}
dataMessageProto.addAllAttachments(attachmentProtos)
// Link preview
val linkPreview = message.attachments.firstOrNull { it.kind == OpenGroupMessage.Attachment.Kind.LinkPreview }
if (linkPreview != null) {
val linkPreviewProto = DataMessage.Preview.newBuilder()
linkPreviewProto.setUrl(linkPreview.linkPreviewURL!!)
linkPreviewProto.setTitle(linkPreview.linkPreviewTitle!!)
val attachmentProto = AttachmentPointer.newBuilder()
attachmentProto.setId(linkPreview.serverID)
attachmentProto.setContentType(linkPreview.contentType)
attachmentProto.setSize(linkPreview.size)
attachmentProto.setFileName(linkPreview.fileName)
attachmentProto.setFlags(linkPreview.flags)
attachmentProto.setWidth(linkPreview.width)
attachmentProto.setHeight(linkPreview.height)
linkPreview.caption?.let { attachmentProto.setCaption(it) }
attachmentProto.setUrl(linkPreview.url)
linkPreviewProto.setImage(attachmentProto.build())
dataMessageProto.addPreview(linkPreviewProto.build())
}
// Quote
val quote = message.quote
if (quote != null) {
val quoteProto = DataMessage.Quote.newBuilder()
quoteProto.setId(quote.quotedMessageTimestamp)
quoteProto.setAuthor(quote.quoteePublicKey)
if (quote.quotedMessageBody != quote.quotedMessageTimestamp.toString()) { quoteProto.setText(quote.quotedMessageBody) }
dataMessageProto.setQuote(quoteProto.build())
}
val messageServerID = message.serverID
// Profile
val profileProto = DataMessage.LokiProfile.newBuilder()
profileProto.setDisplayName(senderDisplayName)
val profilePicture = message.profilePicture
if (profilePicture != null) {
profileProto.setProfilePicture(profilePicture.url)
dataMessageProto.setProfileKey(ByteString.copyFrom(profilePicture.profileKey))
}
dataMessageProto.setProfile(profileProto.build())
/* TODO: the signal service proto needs to be synced with iOS
// Open group info
if (messageServerID != null) {
val openGroupProto = PublicChatInfo.newBuilder()
openGroupProto.setServerID(messageServerID)
dataMessageProto.setPublicChatInfo(openGroupProto.build())
}
*/
// Signal group context
val groupProto = GroupContext.newBuilder()
groupProto.setId(ByteString.copyFrom(id))
groupProto.setType(GroupContext.Type.DELIVER)
groupProto.setName(openGroup.displayName)
dataMessageProto.setGroup(groupProto.build())
// Content
val content = Content.newBuilder()
content.setDataMessage(dataMessageProto.build())
// Envelope
val builder = Envelope.newBuilder()
builder.type = Envelope.Type.UNIDENTIFIED_SENDER
builder.source = senderPublicKey
builder.sourceDevice = 1
builder.setContent(content.build().toByteString())
builder.timestamp = message.timestamp
builder.serverTimestamp = message.serverTimestamp
val envelope = builder.build()
val job = MessageReceiveJob(envelope.toByteArray(), isBackgroundPoll, messageServerID, openGroup.id)
Log.d("Loki", "Scheduling Job $job")
if (isBackgroundPoll) {
job.executeAsync().always { deferred.resolve(Unit) }
// The promise is just used to keep track of when we're done
} else {
JobQueue.shared.add(job)
}
} catch (e: Exception) {
Log.e("Loki", "Exception parsing message", e)
}
}
displayNameUpdates = displayNameUpdates + messages.map { it.senderPublicKey }.toSet() - userHexEncodedPublicKey
executorService?.schedule(::pollForDisplayNames, 0, TimeUnit.MILLISECONDS)
isCaughtUp = true
isPollOngoing = false
deferred.resolve(Unit)
}.fail {
Log.d("Loki", "Failed to get messages for group chat with ID: ${openGroup.channel} on server: ${openGroup.server}.")
isPollOngoing = false
@@ -218,16 +196,17 @@ class OpenGroupPoller(private val openGroup: OpenGroup) {
}
private fun pollForDisplayNames() {
if (displayNameUpdatees.isEmpty()) { return }
val hexEncodedPublicKeys = displayNameUpdatees
displayNameUpdatees = setOf()
if (displayNameUpdates.isEmpty()) { return }
val hexEncodedPublicKeys = displayNameUpdates
displayNameUpdates = setOf()
OpenGroupAPI.getDisplayNames(hexEncodedPublicKeys, openGroup.server).successBackground { mapping ->
for (pair in mapping.entries) {
val senderDisplayName = "${pair.value} (...${pair.key.takeLast(8)})"
if (pair.key == userHexEncodedPublicKey) continue
val senderDisplayName = "${pair.value} (...${pair.key.substring(pair.key.count() - 8)})"
MessagingConfiguration.shared.storage.setOpenGroupDisplayName(pair.key, openGroup.channel, openGroup.server, senderDisplayName)
}
}.fail {
displayNameUpdatees = displayNameUpdatees.union(hexEncodedPublicKeys)
displayNameUpdates = displayNameUpdates.union(hexEncodedPublicKeys)
}
}

View File

@@ -2,25 +2,22 @@ package org.session.libsession.messaging.sending_receiving.pollers
import nl.komponents.kovenant.*
import nl.komponents.kovenant.functional.bind
import org.session.libsession.messaging.MessagingConfiguration
import org.session.libsession.messaging.jobs.JobQueue
import org.session.libsession.messaging.jobs.MessageReceiveJob
import org.session.libsession.messaging.utilities.MessageWrapper
import org.session.libsession.snode.SnodeAPI
import org.session.libsession.snode.SnodeConfiguration
import org.session.libsignal.service.loki.api.Snode
import org.session.libsignal.utilities.logging.Log
import org.session.libsignal.utilities.Base64
import org.session.libsignal.utilities.logging.Log
import java.security.SecureRandom
import java.util.*
private class PromiseCanceledException : Exception("Promise canceled.")
class Poller {
private val userPublicKey = MessagingConfiguration.shared.storage.getUserPublicKey() ?: ""
var userPublicKey = MessagingConfiguration.shared.storage.getUserPublicKey() ?: ""
private var hasStarted: Boolean = false
private val usedSnodes: MutableSet<Snode> = mutableSetOf()
public var isCaughtUp = false

View File

@@ -2,11 +2,11 @@ package org.session.libsession.snode
import nl.komponents.kovenant.Promise
import nl.komponents.kovenant.deferred
import org.session.libsignal.utilities.JsonUtil
import org.session.libsession.utilities.AESGCM.EncryptionResult
import org.session.libsession.utilities.AESGCM
import org.session.libsignal.utilities.ThreadUtils
import org.session.libsession.utilities.AESGCM.EncryptionResult
import org.session.libsignal.service.loki.utilities.toHexString
import org.session.libsignal.utilities.JsonUtil
import org.session.libsignal.utilities.ThreadUtils
import java.nio.Buffer
import java.nio.ByteBuffer
import java.nio.ByteOrder
@@ -62,7 +62,7 @@ object OnionRequestEncryption {
*/
internal fun encryptHop(lhs: OnionRequestAPI.Destination, rhs: OnionRequestAPI.Destination, previousEncryptionResult: EncryptionResult): Promise<EncryptionResult, Exception> {
val deferred = deferred<EncryptionResult, Exception>()
Thread {
ThreadUtils.queue {
try {
val payload: MutableMap<String, Any>
when (rhs) {
@@ -89,7 +89,7 @@ object OnionRequestEncryption {
} catch (exception: Exception) {
deferred.reject(exception)
}
}.start()
}
return deferred.promise
}
}

View File

@@ -2,21 +2,19 @@
package org.session.libsession.snode
import android.os.Build
import nl.komponents.kovenant.*
import nl.komponents.kovenant.functional.bind
import nl.komponents.kovenant.functional.map
import org.session.libsession.snode.utilities.getRandomElement
import org.session.libsignal.utilities.logging.Log
import org.session.libsignal.service.loki.api.utilities.HTTP
import org.session.libsignal.service.loki.api.Snode
import org.session.libsignal.service.loki.api.utilities.HTTP
import org.session.libsignal.service.loki.database.LokiAPIDatabaseProtocol
import org.session.libsignal.service.loki.utilities.Broadcaster
import org.session.libsignal.service.loki.utilities.prettifiedDescription
import org.session.libsignal.service.loki.utilities.retryIfNeeded
import org.session.libsignal.utilities.*
import org.session.libsignal.utilities.logging.Log
import java.security.SecureRandom
object SnodeAPI {
@@ -36,7 +34,14 @@ object SnodeAPI {
private val maxRetryCount = 6
private val minimumSnodePoolCount = 64
private val minimumSwarmSnodeCount = 2
private val seedNodePool: Set<String> = setOf( "https://storage.seed1.loki.network", "https://storage.seed3.loki.network", "https://public.loki.foundation" )
// use port 4433 if API level can handle network security config and enforce pinned certificates
private val seedPort = if (Build.VERSION.SDK_INT < Build.VERSION_CODES.N) 443 else 4433
private val seedNodePool: Set<String> = setOf(
"https://storage.seed1.loki.network:$seedPort",
"https://storage.seed3.loki.network:$seedPort",
"https://public.loki.foundation:$seedPort"
)
internal val snodeFailureThreshold = 4
private val targetSwarmSnodeCount = 2

View File

@@ -25,6 +25,11 @@ public class Debouncer {
this.threshold = threshold;
}
public Debouncer(Handler handler, long threshold) {
this.handler = handler;
this.threshold = threshold;
}
public void publish(Runnable runnable) {
handler.removeCallbacksAndMessages(null);
handler.postDelayed(runnable, threshold);

View File

@@ -1,6 +1,7 @@
package org.session.libsession.utilities
import android.content.Context
import org.session.libsession.messaging.messages.control.ExpirationTimerUpdate
import org.session.libsession.messaging.sending_receiving.notifications.MessageNotifier
import org.session.libsession.messaging.threads.Address
import org.session.libsession.messaging.threads.recipients.Recipient
@@ -36,8 +37,8 @@ class SSKEnvironment(
}
interface MessageExpirationManagerProtocol {
fun setExpirationTimer(messageID: Long?, duration: Int, senderPublicKey: String, content: SignalServiceProtos.Content)
fun disableExpirationTimer(messageID: Long?, senderPublicKey: String, content: SignalServiceProtos.Content)
fun setExpirationTimer(message: ExpirationTimerUpdate)
fun disableExpirationTimer(message: ExpirationTimerUpdate)
fun startAnyExpiration(timestamp: Long, author: String)
}