feat: add new message receive pipeline in important places and fix parsing issues

This commit is contained in:
jubb 2021-03-09 17:26:29 +11:00
parent 1a6da88ce3
commit ca7202f255
11 changed files with 47 additions and 78 deletions

View File

@ -34,23 +34,28 @@ import com.google.firebase.iid.FirebaseInstanceId;
import org.conscrypt.Conscrypt;
import org.session.libsession.messaging.MessagingConfiguration;
import org.session.libsession.messaging.avatars.AvatarHelper;
import org.session.libsession.messaging.sending_receiving.notifications.MessageNotifier;
import org.session.libsession.messaging.sending_receiving.pollers.Poller;
import org.session.libsession.messaging.threads.Address;
import org.session.libsession.snode.SnodeConfiguration;
import org.session.libsession.utilities.SSKEnvironment;
import org.session.libsession.messaging.sending_receiving.notifications.MessageNotifier;
import org.session.libsession.utilities.dynamiclanguage.DynamicLanguageContextWrapper;
import org.session.libsession.utilities.TextSecurePreferences;
import org.session.libsession.utilities.Util;
import org.session.libsession.utilities.dynamiclanguage.DynamicLanguageContextWrapper;
import org.session.libsession.utilities.dynamiclanguage.LocaleParser;
import org.session.libsession.utilities.preferences.ProfileKeyUtil;
import org.session.libsignal.service.api.util.StreamDetails;
import org.session.libsignal.service.loki.api.PushNotificationAPI;
import org.session.libsignal.service.loki.api.SnodeAPI;
import org.session.libsignal.service.loki.api.SwarmAPI;
import org.session.libsignal.service.loki.api.fileserver.FileServerAPI;
import org.session.libsignal.service.loki.api.opengroups.PublicChatAPI;
import org.session.libsignal.service.loki.database.LokiAPIDatabaseProtocol;
import org.session.libsignal.service.loki.utilities.mentions.MentionsManager;
import org.session.libsignal.utilities.logging.Log;
import org.signal.aesgcmprovider.AesGcmProvider;
import org.thoughtcrime.securesms.loki.api.SessionProtocolImpl;
import org.thoughtcrime.securesms.sskenvironment.ProfileManager;
import org.thoughtcrime.securesms.sskenvironment.ReadReceiptManager;
import org.thoughtcrime.securesms.sskenvironment.TypingStatusRepository;
import org.thoughtcrime.securesms.components.TypingStatusSender;
import org.thoughtcrime.securesms.crypto.IdentityKeyUtil;
import org.session.libsession.utilities.preferences.ProfileKeyUtil;
import org.session.libsession.messaging.threads.Address;
import org.thoughtcrime.securesms.database.DatabaseFactory;
import org.thoughtcrime.securesms.database.GroupDatabase;
import org.thoughtcrime.securesms.dependencies.InjectableType;
@ -60,9 +65,7 @@ import org.thoughtcrime.securesms.jobmanager.JobManager;
import org.thoughtcrime.securesms.jobmanager.impl.JsonDataSerializer;
import org.thoughtcrime.securesms.jobs.FastJobStorage;
import org.thoughtcrime.securesms.jobs.JobManagerFactories;
import org.thoughtcrime.securesms.jobs.PushContentReceiveJob;
import org.thoughtcrime.securesms.logging.AndroidLogger;
import org.session.libsignal.utilities.logging.Log;
import org.thoughtcrime.securesms.logging.PersistentLogger;
import org.thoughtcrime.securesms.logging.UncaughtExceptionLogger;
import org.thoughtcrime.securesms.loki.activities.HomeActivity;
@ -70,6 +73,7 @@ import org.thoughtcrime.securesms.loki.api.BackgroundPollWorker;
import org.thoughtcrime.securesms.loki.api.ClosedGroupPoller;
import org.thoughtcrime.securesms.loki.api.LokiPushNotificationManager;
import org.thoughtcrime.securesms.loki.api.PublicChatManager;
import org.thoughtcrime.securesms.loki.api.SessionProtocolImpl;
import org.thoughtcrime.securesms.loki.database.LokiAPIDatabase;
import org.thoughtcrime.securesms.loki.database.LokiThreadDatabase;
import org.thoughtcrime.securesms.loki.database.LokiUserDatabase;
@ -84,22 +88,14 @@ import org.thoughtcrime.securesms.service.ExpiringMessageManager;
import org.thoughtcrime.securesms.service.KeyCachingService;
import org.thoughtcrime.securesms.service.LocalBackupListener;
import org.thoughtcrime.securesms.service.UpdateApkRefreshListener;
import org.thoughtcrime.securesms.sskenvironment.ProfileManager;
import org.thoughtcrime.securesms.sskenvironment.ReadReceiptManager;
import org.thoughtcrime.securesms.sskenvironment.TypingStatusRepository;
import org.thoughtcrime.securesms.util.dynamiclanguage.LocaleParseHelper;
import org.webrtc.PeerConnectionFactory;
import org.webrtc.PeerConnectionFactory.InitializationOptions;
import org.webrtc.voiceengine.WebRtcAudioManager;
import org.webrtc.voiceengine.WebRtcAudioUtils;
import org.session.libsignal.service.api.messages.SignalServiceEnvelope;
import org.session.libsignal.service.api.util.StreamDetails;
import org.session.libsignal.service.internal.push.SignalServiceProtos;
import org.session.libsignal.service.loki.api.Poller;
import org.session.libsignal.service.loki.api.PushNotificationAPI;
import org.session.libsignal.service.loki.api.SnodeAPI;
import org.session.libsignal.service.loki.api.SwarmAPI;
import org.session.libsignal.service.loki.api.fileserver.FileServerAPI;
import org.session.libsignal.service.loki.api.opengroups.PublicChatAPI;
import org.session.libsignal.service.loki.database.LokiAPIDatabaseProtocol;
import org.session.libsignal.service.loki.utilities.mentions.MentionsManager;
import java.io.File;
import java.io.FileInputStream;
@ -456,15 +452,9 @@ public class ApplicationContext extends MultiDexApplication implements Dependenc
return;
}
LokiAPIDatabase apiDB = DatabaseFactory.getLokiAPIDatabase(this);
Context context = this;
SwarmAPI.Companion.configureIfNeeded(apiDB);
SnodeAPI.Companion.configureIfNeeded(userPublicKey, apiDB, broadcaster);
poller = new Poller(userPublicKey, apiDB, envelopes -> {
for (SignalServiceProtos.Envelope envelope : envelopes) {
new PushContentReceiveJob(context).processEnvelope(new SignalServiceEnvelope(envelope), false);
}
return Unit.INSTANCE;
});
poller = new Poller();
ClosedGroupPoller.Companion.configureIfNeeded(this);
closedGroupPoller = ClosedGroupPoller.Companion.getShared();
}

View File

@ -6,14 +6,13 @@ import android.os.Looper;
import androidx.annotation.MainThread;
import androidx.annotation.NonNull;
import org.session.libsession.messaging.sending_receiving.notifications.MessageNotifier;
import org.session.libsession.messaging.sending_receiving.pollers.Poller;
import org.session.libsession.messaging.threads.recipients.Recipient;
import org.session.libsession.utilities.Debouncer;
import org.session.libsignal.utilities.ThreadUtils;
import org.thoughtcrime.securesms.ApplicationContext;
import org.thoughtcrime.securesms.loki.api.PublicChatManager;
import org.session.libsession.utilities.Debouncer;
import org.session.libsignal.service.loki.api.Poller;
import org.session.libsignal.utilities.ThreadUtils;
import org.session.libsession.messaging.threads.recipients.Recipient;
import org.session.libsession.messaging.sending_receiving.notifications.MessageNotifier;
import java.util.concurrent.TimeUnit;

View File

@ -4,11 +4,11 @@ import com.google.protobuf.ByteString
import org.session.libsignal.libsignal.ecc.DjbECPrivateKey
import org.session.libsignal.libsignal.ecc.DjbECPublicKey
import org.session.libsignal.libsignal.ecc.ECKeyPair
import org.session.libsignal.utilities.logging.Log
import org.session.libsignal.service.internal.push.SignalServiceProtos
import org.session.libsignal.service.internal.push.SignalServiceProtos.DataMessage
import org.session.libsignal.service.loki.utilities.toHexString
import org.session.libsignal.utilities.Hex
import org.session.libsignal.utilities.logging.Log
class ClosedGroupControlMessage() : ControlMessage() {
@ -56,7 +56,8 @@ class ClosedGroupControlMessage() : ControlMessage() {
const val TAG = "ClosedGroupControlMessage"
fun fromProto(proto: SignalServiceProtos.Content): ClosedGroupControlMessage? {
val closedGroupControlMessageProto = proto.dataMessage?.closedGroupControlMessage ?: return null
if (!proto.hasDataMessage() || !proto.dataMessage.hasClosedGroupControlMessage()) return null
val closedGroupControlMessageProto = proto.dataMessage?.closedGroupControlMessage!!
val kind: Kind
when(closedGroupControlMessageProto.type) {
DataMessage.ClosedGroupControlMessage.Type.NEW -> {

View File

@ -1,7 +1,7 @@
package org.session.libsession.messaging.messages.control
import org.session.libsignal.utilities.logging.Log
import org.session.libsignal.service.internal.push.SignalServiceProtos
import org.session.libsignal.utilities.logging.Log
class ExpirationTimerUpdate() : ControlMessage() {
@ -12,7 +12,7 @@ class ExpirationTimerUpdate() : ControlMessage() {
const val TAG = "ExpirationTimerUpdate"
fun fromProto(proto: SignalServiceProtos.Content): ExpirationTimerUpdate? {
val dataMessageProto = proto.dataMessage ?: return null
val dataMessageProto = if (proto.hasDataMessage()) proto.dataMessage else return null
val isExpirationTimerUpdate = dataMessageProto.flags.and(SignalServiceProtos.DataMessage.Flags.EXPIRATION_TIMER_UPDATE_VALUE) != 0
if (!isExpirationTimerUpdate) return null
val duration = dataMessageProto.expireTimer

View File

@ -1,7 +1,7 @@
package org.session.libsession.messaging.messages.control
import org.session.libsignal.utilities.logging.Log
import org.session.libsignal.service.internal.push.SignalServiceProtos
import org.session.libsignal.utilities.logging.Log
class ReadReceipt() : ControlMessage() {
@ -11,7 +11,7 @@ class ReadReceipt() : ControlMessage() {
const val TAG = "ReadReceipt"
fun fromProto(proto: SignalServiceProtos.Content): ReadReceipt? {
val receiptProto = proto.receiptMessage ?: return null
val receiptProto = if (proto.hasReceiptMessage()) proto.receiptMessage else return null
if (receiptProto.type != SignalServiceProtos.ReceiptMessage.Type.READ) return null
val timestamps = receiptProto.timestampList
if (timestamps.isEmpty()) return null

View File

@ -1,7 +1,7 @@
package org.session.libsession.messaging.messages.control
import org.session.libsignal.utilities.logging.Log
import org.session.libsignal.service.internal.push.SignalServiceProtos
import org.session.libsignal.utilities.logging.Log
class TypingIndicator() : ControlMessage() {
@ -11,7 +11,7 @@ class TypingIndicator() : ControlMessage() {
const val TAG = "TypingIndicator"
fun fromProto(proto: SignalServiceProtos.Content): TypingIndicator? {
val typingIndicatorProto = proto.typingMessage ?: return null
val typingIndicatorProto = if (proto.hasTypingMessage()) proto.typingMessage else return null
val kind = Kind.fromProto(typingIndicatorProto.action)
return TypingIndicator(kind = kind)
}

View File

@ -12,7 +12,7 @@ class VisibleMessage : Message() {
var syncTarget: String? = null
var text: String? = null
var attachmentIDs = ArrayList<Long>()
var attachmentIDs:List<Long> = mutableListOf()
var quote: Quote? = null
var linkPreview: LinkPreview? = null
var contact: Contact? = null
@ -24,17 +24,17 @@ class VisibleMessage : Message() {
const val TAG = "VisibleMessage"
fun fromProto(proto: SignalServiceProtos.Content): VisibleMessage? {
val dataMessage = proto.dataMessage ?: return null
val dataMessage = if (proto.hasDataMessage()) proto.dataMessage else return null
val result = VisibleMessage()
result.syncTarget = dataMessage.syncTarget
result.text = dataMessage.body
// Attachments are handled in MessageReceiver
val quoteProto = dataMessage.quote
val quoteProto = if (dataMessage.hasQuote()) dataMessage.quote else null
quoteProto?.let {
val quote = Quote.fromProto(quoteProto)
quote?.let { result.quote = quote }
}
val linkPreviewProto = dataMessage.previewList.first()
val linkPreviewProto = dataMessage.previewList.firstOrNull()
linkPreviewProto?.let {
val linkPreview = LinkPreview.fromProto(linkPreviewProto)
linkPreview?.let { result.linkPreview = linkPreview }

View File

@ -4,7 +4,7 @@ import org.session.libsession.messaging.MessagingConfiguration
import org.session.libsession.messaging.messages.Message
import org.session.libsession.messaging.messages.control.*
import org.session.libsession.messaging.messages.visible.VisibleMessage
import org.session.libsignal.service.internal.push.PushTransportDetails
import org.session.libsignal.service.internal.push.SignalServiceProtos
object MessageReceiver {
@ -94,20 +94,6 @@ object MessageReceiver {
}
groupPublicKey = envelope.source
decrypt()
// try {
// decrypt()
// } catch(error: Exception) {
// val now = System.currentTimeMillis()
// var shouldRequestEncryptionKeyPair = true
// lastEncryptionKeyPairRequest[groupPublicKey!!]?.let {
// shouldRequestEncryptionKeyPair = now - it > 30 * 1000
// }
// if (shouldRequestEncryptionKeyPair) {
// MessageSender.requestEncryptionKeyPair(groupPublicKey)
// lastEncryptionKeyPairRequest[groupPublicKey] = now
// }
// throw error
// }
}
else -> throw Error.UnknownEnvelopeType
}
@ -115,7 +101,7 @@ object MessageReceiver {
// Don't process the envelope any further if the sender is blocked
if (isBlock(sender!!)) throw Error.SenderBlocked
// Parse the proto
val proto = SignalServiceProtos.Content.parseFrom(plaintext)
val proto = SignalServiceProtos.Content.parseFrom(PushTransportDetails.getStrippedPaddingMessageBody(plaintext))
// Parse the message
val message: Message = ReadReceipt.fromProto(proto) ?:
TypingIndicator.fromProto(proto) ?:

View File

@ -21,15 +21,14 @@ import org.session.libsession.utilities.TextSecurePreferences
import org.session.libsignal.libsignal.ecc.DjbECPrivateKey
import org.session.libsignal.libsignal.ecc.DjbECPublicKey
import org.session.libsignal.libsignal.ecc.ECKeyPair
import org.session.libsignal.utilities.logging.Log
import org.session.libsignal.libsignal.util.guava.Optional
import org.session.libsignal.service.api.messages.SignalServiceGroup
import org.session.libsignal.service.internal.push.SignalServiceProtos
import org.session.libsignal.service.loki.utilities.removing05PrefixIfNeeded
import org.session.libsignal.service.loki.utilities.toHexString
import org.session.libsignal.utilities.logging.Log
import java.security.MessageDigest
import java.util.*
import kotlin.collections.ArrayList
internal fun MessageReceiver.isBlock(publicKey: String): Boolean {
val context = MessagingConfiguration.shared.context
@ -134,8 +133,7 @@ fun MessageReceiver.handleVisibleMessage(message: VisibleMessage, proto: SignalS
}
}
val attachmentIDs = storage.persistAttachments(message.id ?: 0, attachments)
message.attachmentIDs = attachmentIDs as ArrayList<Long>
var attachmentsToDownload = attachmentIDs
message.attachmentIDs = attachmentIDs.toMutableList()
// Update profile if needed
val newProfile = message.profile
if (newProfile != null) {
@ -194,7 +192,7 @@ fun MessageReceiver.handleVisibleMessage(message: VisibleMessage, proto: SignalS
val messageID = storage.persist(message, quoteModel, linkPreviews, message.groupPublicKey, openGroupID) ?: throw MessageReceiver.Error.NoThread
message.threadID = threadID
// Start attachment downloads if needed
attachmentsToDownload.forEach { attachmentID ->
attachmentIDs.forEach { attachmentID ->
val downloadJob = AttachmentDownloadJob(attachmentID, messageID)
JobQueue.shared.add(downloadJob)
}

View File

@ -5,23 +5,20 @@ package org.session.libsession.messaging.sending_receiving
import com.google.protobuf.ByteString
import nl.komponents.kovenant.Promise
import nl.komponents.kovenant.deferred
import org.session.libsession.messaging.MessagingConfiguration
import org.session.libsession.messaging.messages.control.ClosedGroupControlMessage
import org.session.libsession.messaging.sending_receiving.notifications.PushNotificationAPI
import org.session.libsession.messaging.sending_receiving.MessageSender.Error
import org.session.libsession.messaging.sending_receiving.notifications.PushNotificationAPI
import org.session.libsession.messaging.threads.Address
import org.session.libsession.messaging.threads.recipients.Recipient
import org.session.libsession.utilities.GroupUtil
import org.session.libsession.utilities.TextSecurePreferences
import org.session.libsignal.utilities.Hex
import org.session.libsignal.libsignal.ecc.Curve
import org.session.libsignal.libsignal.ecc.ECKeyPair
import org.session.libsignal.libsignal.util.guava.Optional
import org.session.libsignal.service.internal.push.SignalServiceProtos
import org.session.libsignal.service.loki.utilities.hexEncodedPublicKey
import org.session.libsignal.service.loki.utilities.removing05PrefixIfNeeded
import org.session.libsignal.utilities.Hex
import org.session.libsignal.utilities.ThreadUtils
import org.session.libsignal.utilities.logging.Log
import java.util.*
@ -252,6 +249,7 @@ fun MessageSender.generateAndSendNewEncryptionKeyPair(groupPublicKey: String, ta
sendNonDurably(closedGroupControlMessage, Address.fromSerialized(groupID)).success {
// Store it * after * having sent out the message to the group
storage.addClosedGroupEncryptionKeyPair(newKeyPair, groupPublicKey)
}.always {
pendingKeyPair[groupPublicKey] = Optional.absent()
}
}

View File

@ -2,25 +2,22 @@ package org.session.libsession.messaging.sending_receiving.pollers
import nl.komponents.kovenant.*
import nl.komponents.kovenant.functional.bind
import org.session.libsession.messaging.MessagingConfiguration
import org.session.libsession.messaging.jobs.JobQueue
import org.session.libsession.messaging.jobs.MessageReceiveJob
import org.session.libsession.messaging.utilities.MessageWrapper
import org.session.libsession.snode.SnodeAPI
import org.session.libsession.snode.SnodeConfiguration
import org.session.libsignal.service.loki.api.Snode
import org.session.libsignal.utilities.logging.Log
import org.session.libsignal.utilities.Base64
import org.session.libsignal.utilities.logging.Log
import java.security.SecureRandom
import java.util.*
private class PromiseCanceledException : Exception("Promise canceled.")
class Poller {
private val userPublicKey = MessagingConfiguration.shared.storage.getUserPublicKey() ?: ""
var userPublicKey = MessagingConfiguration.shared.storage.getUserPublicKey() ?: ""
private var hasStarted: Boolean = false
private val usedSnodes: MutableSet<Snode> = mutableSetOf()
public var isCaughtUp = false