hook up sending pipeline & clean

This commit is contained in:
Ryan ZHAO
2021-03-02 12:24:09 +11:00
parent 242eb90d21
commit d9eaedd6ae
79 changed files with 457 additions and 749 deletions

View File

@@ -0,0 +1,10 @@
package org.session.libsession.database.documents;
import java.util.List;
public interface Document<T> {
public int size();
public List<T> getList();
}

View File

@@ -0,0 +1,88 @@
package org.session.libsession.database.documents;
import org.session.libsignal.utilities.logging.Log;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import org.session.libsession.messaging.threads.Address;
import org.session.libsignal.utilities.Base64;
import org.session.libsignal.libsignal.IdentityKey;
import org.session.libsignal.libsignal.InvalidKeyException;
import java.io.IOException;
public class IdentityKeyMismatch {
private static final String TAG = IdentityKeyMismatch.class.getSimpleName();
@JsonProperty(value = "a")
private String address;
@JsonProperty(value = "k")
@JsonSerialize(using = IdentityKeySerializer.class)
@JsonDeserialize(using = IdentityKeyDeserializer.class)
private IdentityKey identityKey;
public IdentityKeyMismatch() {}
public IdentityKeyMismatch(Address address, IdentityKey identityKey) {
this.address = address.serialize();
this.identityKey = identityKey;
}
@JsonIgnore
public Address getAddress() {
return Address.fromSerialized(address);
}
public IdentityKey getIdentityKey() {
return identityKey;
}
@Override
public boolean equals(Object other) {
if (other == null || !(other instanceof IdentityKeyMismatch)) {
return false;
}
IdentityKeyMismatch that = (IdentityKeyMismatch)other;
return that.address.equals(this.address) && that.identityKey.equals(this.identityKey);
}
@Override
public int hashCode() {
return address.hashCode() ^ identityKey.hashCode();
}
private static class IdentityKeySerializer extends JsonSerializer<IdentityKey> {
@Override
public void serialize(IdentityKey value, JsonGenerator jsonGenerator, SerializerProvider serializers)
throws IOException
{
jsonGenerator.writeString(Base64.encodeBytes(value.serialize()));
}
}
private static class IdentityKeyDeserializer extends JsonDeserializer<IdentityKey> {
@Override
public IdentityKey deserialize(JsonParser jsonParser, DeserializationContext ctxt)
throws IOException
{
try {
return new IdentityKey(Base64.decode(jsonParser.getValueAsString()), 0);
} catch (InvalidKeyException e) {
Log.w(TAG, e);
throw new IOException(e);
}
}
}
}

View File

@@ -0,0 +1,33 @@
package org.session.libsession.database.documents;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.LinkedList;
import java.util.List;
public class IdentityKeyMismatchList implements Document<IdentityKeyMismatch> {
@JsonProperty(value = "m")
private List<IdentityKeyMismatch> mismatches;
public IdentityKeyMismatchList() {
this.mismatches = new LinkedList<>();
}
public IdentityKeyMismatchList(List<IdentityKeyMismatch> mismatches) {
this.mismatches = mismatches;
}
@Override
public int size() {
if (mismatches == null) return 0;
else return mismatches.size();
}
@Override
@JsonIgnore
public List<IdentityKeyMismatch> getList() {
return mismatches;
}
}

View File

@@ -0,0 +1,36 @@
package org.session.libsession.database.documents;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.session.libsession.messaging.threads.Address;
public class NetworkFailure {
@JsonProperty(value = "a")
private String address;
public NetworkFailure(Address address) {
this.address = address.serialize();
}
public NetworkFailure() {}
@JsonIgnore
public Address getAddress() {
return Address.fromSerialized(address);
}
@Override
public boolean equals(Object other) {
if (other == null || !(other instanceof NetworkFailure)) return false;
NetworkFailure that = (NetworkFailure)other;
return this.address.equals(that.address);
}
@Override
public int hashCode() {
return address.hashCode();
}
}

View File

@@ -0,0 +1,33 @@
package org.session.libsession.database.documents;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.LinkedList;
import java.util.List;
public class NetworkFailureList implements Document<NetworkFailure> {
@JsonProperty(value = "l")
private List<NetworkFailure> failures;
public NetworkFailureList() {
this.failures = new LinkedList<>();
}
public NetworkFailureList(List<NetworkFailure> failures) {
this.failures = failures;
}
@Override
public int size() {
if (failures == null) return 0;
else return failures.size();
}
@Override
@JsonIgnore
public List<NetworkFailure> getList() {
return failures;
}
}

View File

@@ -95,8 +95,8 @@ interface StorageProtocol {
fun getMessageIdInDatabase(timestamp: Long, author: String): Long?
fun setOpenGroupServerMessageID(messageID: Long, serverID: Long)
fun markAsSent(messageID: Long)
fun markUnidentified(messageID: Long)
fun markAsSent(timestamp: Long, author: String)
fun markUnidentified(timestamp: Long, author: String)
fun setErrorMessage(messageID: Long, error: Exception)
// Closed Groups

View File

@@ -28,7 +28,7 @@ class AttachmentUploadJob(val attachmentID: Long, val threadID: String, val mess
// Settings
override val maxFailureCount: Int = 20
companion object {
val TAG = AttachmentUploadJob::class.qualifiedName
val TAG = AttachmentUploadJob::class.simpleName
val KEY: String = "AttachmentUploadJob"
val maxFailureCount: Int = 20

View File

@@ -15,8 +15,8 @@ class MessageReceiveJob(val data: ByteArray, val isBackgroundPoll: Boolean, val
// Settings
override val maxFailureCount: Int = 10
companion object {
val TAG = MessageReceiveJob::class.qualifiedName
val KEY: String = "AttachmentUploadJob"
val TAG = MessageReceiveJob::class.simpleName
val KEY: String = "MessageReceiveJob"
//keys used for database storage purpose
private val KEY_DATA = "data"

View File

@@ -19,7 +19,7 @@ class MessageSendJob(val message: Message, val destination: Destination) : Job {
// Settings
override val maxFailureCount: Int = 10
companion object {
val TAG = MessageSendJob::class.qualifiedName
val TAG = MessageSendJob::class.simpleName
val KEY: String = "MessageSendJob"
//keys used for database storage purpose

View File

@@ -18,8 +18,12 @@ abstract class Message {
// validation
open fun isValid(): Boolean {
sentTimestamp = if (sentTimestamp!! > 0) sentTimestamp else return false
receivedTimestamp = if (receivedTimestamp!! > 0) receivedTimestamp else return false
sentTimestamp?.let {
if (it <= 0) return false
}
receivedTimestamp?.let {
if (it <= 0) return false
}
return sender != null && recipient != null
}

View File

@@ -5,6 +5,7 @@ import org.session.libsignal.service.internal.push.SignalServiceProtos
class ExpirationTimerUpdate() : ControlMessage() {
var syncTarget: String? = null
var duration: Int? = 0
companion object {
@@ -12,7 +13,7 @@ class ExpirationTimerUpdate() : ControlMessage() {
fun fromProto(proto: SignalServiceProtos.Content): ExpirationTimerUpdate? {
val dataMessageProto = proto.dataMessage ?: return null
val isExpirationTimerUpdate = (dataMessageProto.flags and SignalServiceProtos.DataMessage.Flags.EXPIRATION_TIMER_UPDATE_VALUE) != 0 //TODO validate that 'and' operator equivalent to Swift '&'
val isExpirationTimerUpdate = dataMessageProto.flags.and(SignalServiceProtos.DataMessage.Flags.EXPIRATION_TIMER_UPDATE_VALUE) != 0
if (!isExpirationTimerUpdate) return null
val duration = dataMessageProto.expireTimer
return ExpirationTimerUpdate(duration)

View File

@@ -0,0 +1,13 @@
package org.session.libsession.messaging.messages.signal;
public class IncomingEncryptedMessage extends IncomingTextMessage {
public IncomingEncryptedMessage(IncomingTextMessage base, String newBody) {
super(base, newBody);
}
@Override
public boolean isSecureMessage() {
return true;
}
}

View File

@@ -0,0 +1,27 @@
package org.session.libsession.messaging.messages.signal;
import static org.session.libsignal.service.internal.push.SignalServiceProtos.GroupContext;
public class IncomingGroupMessage extends IncomingTextMessage {
private final GroupContext groupContext;
public IncomingGroupMessage(IncomingTextMessage base, GroupContext groupContext, String body) {
super(base, body);
this.groupContext = groupContext;
}
@Override
public boolean isGroup() {
return true;
}
public boolean isUpdate() {
return groupContext.getType().getNumber() == GroupContext.Type.UPDATE_VALUE;
}
public boolean isQuit() {
return groupContext.getType().getNumber() == GroupContext.Type.QUIT_VALUE;
}
}

View File

@@ -0,0 +1,185 @@
package org.session.libsession.messaging.messages.signal;
import android.os.Parcel;
import android.os.Parcelable;
import androidx.annotation.Nullable;
import org.session.libsession.messaging.messages.visible.VisibleMessage;
import org.session.libsession.messaging.threads.Address;
import org.session.libsession.utilities.GroupUtil;
import org.session.libsignal.libsignal.util.guava.Optional;
import org.session.libsignal.service.api.messages.SignalServiceGroup;
public class IncomingTextMessage implements Parcelable {
public static final Parcelable.Creator<IncomingTextMessage> CREATOR = new Parcelable.Creator<IncomingTextMessage>() {
@Override
public IncomingTextMessage createFromParcel(Parcel in) {
return new IncomingTextMessage(in);
}
@Override
public IncomingTextMessage[] newArray(int size) {
return new IncomingTextMessage[size];
}
};
private static final String TAG = IncomingTextMessage.class.getSimpleName();
private final String message;
private Address sender;
private final int senderDeviceId;
private final int protocol;
private final String serviceCenterAddress;
private final boolean replyPathPresent;
private final String pseudoSubject;
private final long sentTimestampMillis;
private final Address groupId;
private final boolean push;
private final int subscriptionId;
private final long expiresInMillis;
private final boolean unidentified;
public IncomingTextMessage(Address sender, int senderDeviceId, long sentTimestampMillis,
String encodedBody, Optional<SignalServiceGroup> group,
long expiresInMillis, boolean unidentified)
{
this.message = encodedBody;
this.sender = sender;
this.senderDeviceId = senderDeviceId;
this.protocol = 31337;
this.serviceCenterAddress = "GCM";
this.replyPathPresent = true;
this.pseudoSubject = "";
this.sentTimestampMillis = sentTimestampMillis;
this.push = true;
this.subscriptionId = -1;
this.expiresInMillis = expiresInMillis;
this.unidentified = unidentified;
if (group.isPresent()) {
this.groupId = Address.fromSerialized(GroupUtil.getEncodedId(group.get()));
} else {
this.groupId = null;
}
}
public IncomingTextMessage(Parcel in) {
this.message = in.readString();
this.sender = in.readParcelable(IncomingTextMessage.class.getClassLoader());
this.senderDeviceId = in.readInt();
this.protocol = in.readInt();
this.serviceCenterAddress = in.readString();
this.replyPathPresent = (in.readInt() == 1);
this.pseudoSubject = in.readString();
this.sentTimestampMillis = in.readLong();
this.groupId = in.readParcelable(IncomingTextMessage.class.getClassLoader());
this.push = (in.readInt() == 1);
this.subscriptionId = in.readInt();
this.expiresInMillis = in.readLong();
this.unidentified = in.readInt() == 1;
}
public IncomingTextMessage(IncomingTextMessage base, String newBody) {
this.message = newBody;
this.sender = base.getSender();
this.senderDeviceId = base.getSenderDeviceId();
this.protocol = base.getProtocol();
this.serviceCenterAddress = base.getServiceCenterAddress();
this.replyPathPresent = base.isReplyPathPresent();
this.pseudoSubject = base.getPseudoSubject();
this.sentTimestampMillis = base.getSentTimestampMillis();
this.groupId = base.getGroupId();
this.push = base.isPush();
this.subscriptionId = base.getSubscriptionId();
this.expiresInMillis = base.getExpiresIn();
this.unidentified = base.isUnidentified();
}
public static IncomingTextMessage from(VisibleMessage message,
Address sender,
Optional<SignalServiceGroup> group,
long expiresInMillis)
{
return new IncomingTextMessage(sender, 1, message.getReceivedTimestamp(), message.getText(), group, expiresInMillis, false);
}
public int getSubscriptionId() {
return subscriptionId;
}
public long getExpiresIn() {
return expiresInMillis;
}
public long getSentTimestampMillis() {
return sentTimestampMillis;
}
public String getPseudoSubject() {
return pseudoSubject;
}
public String getMessageBody() {
return message;
}
public Address getSender() {
return sender;
}
public int getSenderDeviceId() {
return senderDeviceId;
}
public int getProtocol() {
return protocol;
}
public String getServiceCenterAddress() {
return serviceCenterAddress;
}
public boolean isReplyPathPresent() {
return replyPathPresent;
}
public boolean isSecureMessage() {
return false;
}
public boolean isPush() {
return push;
}
public @Nullable Address getGroupId() {
return groupId;
}
public boolean isGroup() {
return false;
}
public boolean isUnidentified() {
return unidentified;
}
@Override
public int describeContents() {
return 0;
}
@Override
public void writeToParcel(Parcel out, int flags) {
out.writeString(message);
out.writeParcelable(sender, flags);
out.writeInt(senderDeviceId);
out.writeInt(protocol);
out.writeString(serviceCenterAddress);
out.writeInt(replyPathPresent ? 1 : 0);
out.writeString(pseudoSubject);
out.writeLong(sentTimestampMillis);
out.writeParcelable(groupId, flags);
out.writeInt(push ? 1 : 0);
out.writeInt(subscriptionId);
out.writeInt(unidentified ? 1 : 0);
}
}

View File

@@ -0,0 +1,43 @@
package org.session.libsession.messaging.messages.signal;
import org.session.libsession.messaging.messages.visible.VisibleMessage;
import org.session.libsession.messaging.threads.recipients.Recipient;
public class OutgoingTextMessage {
private final Recipient recipient;
private final String message;
private final int subscriptionId;
private final long expiresIn;
public OutgoingTextMessage(Recipient recipient, String message, long expiresIn, int subscriptionId) {
this.recipient = recipient;
this.message = message;
this.expiresIn = expiresIn;
this.subscriptionId = subscriptionId;
}
public static OutgoingTextMessage from(VisibleMessage message, Recipient recipient) {
return new OutgoingTextMessage(recipient, message.getText(), recipient.getExpireMessages() * 1000, -1);
}
public long getExpiresIn() {
return expiresIn;
}
public int getSubscriptionId() {
return subscriptionId;
}
public String getMessageBody() {
return message;
}
public Recipient getRecipient() {
return recipient;
}
public boolean isSecureMessage() {
return true;
}
}

View File

@@ -32,7 +32,7 @@ class Attachment {
result.contentType = proto.contentType ?: inferContentType()
result.key = proto.key.toByteArray()
result.digest = proto.digest.toByteArray()
val kind: Kind = if (proto.hasFlags() && (proto.flags and SignalServiceProtos.AttachmentPointer.Flags.VOICE_MESSAGE_VALUE) > 0) { //TODO validate that 'and' operator = swift '&'
val kind: Kind = if (proto.hasFlags() && proto.flags.and(SignalServiceProtos.AttachmentPointer.Flags.VOICE_MESSAGE_VALUE) > 0) {
Kind.VOICE_MESSAGE
} else {
Kind.GENERIC
@@ -42,7 +42,7 @@ class Attachment {
val size: Size = if (proto.hasWidth() && proto.width > 0 && proto.hasHeight() && proto.height > 0) {
Size(proto.width, proto.height)
} else {
Size(0,0) //TODO check that it's equivalent to swift: CGSize.zero
Size(0,0)
}
result.size = size
result.sizeInBytes = if (proto.size > 0) proto.size else null

View File

@@ -1,6 +1,7 @@
package org.session.libsession.messaging.messages.visible
import org.session.libsession.messaging.MessagingConfiguration
import org.session.libsession.messaging.sending_receiving.linkpreview.LinkPreview as SignalLinkPreiview
import org.session.libsignal.utilities.logging.Log
import org.session.libsignal.service.internal.push.SignalServiceProtos
@@ -18,6 +19,14 @@ class LinkPreview() {
val url = proto.url
return LinkPreview(title, url, null)
}
fun from(signalLinkPreview: SignalLinkPreiview?): LinkPreview? {
return if (signalLinkPreview == null) {
null
} else {
LinkPreview(signalLinkPreview.title, signalLinkPreview.url, signalLinkPreview.attachmentId?.rowId)
}
}
}
//constructor

View File

@@ -2,6 +2,8 @@ package org.session.libsession.messaging.messages.visible
import com.goterl.lazycode.lazysodium.BuildConfig
import org.session.libsession.messaging.MessagingConfiguration
import org.session.libsession.messaging.sending_receiving.attachments.DatabaseAttachment
import org.session.libsession.messaging.sending_receiving.quotes.QuoteModel as SignalQuote
import org.session.libsignal.utilities.logging.Log
import org.session.libsignal.service.internal.push.SignalServiceProtos
@@ -21,6 +23,15 @@ class Quote() {
val text = proto.text
return Quote(timestamp, publicKey, text, null)
}
fun from(signalQuote: SignalQuote?): Quote? {
return if (signalQuote == null) {
null
} else {
val attachmentID = (signalQuote.attachments?.firstOrNull() as? DatabaseAttachment)?.attachmentId?.rowId
Quote(signalQuote.id, signalQuote.author.serialize(), signalQuote.text, attachmentID)
}
}
}
//constructor

View File

@@ -4,6 +4,8 @@ import com.goterl.lazycode.lazysodium.BuildConfig
import org.session.libsession.messaging.MessagingConfiguration
import org.session.libsession.messaging.messages.Message
import org.session.libsession.messaging.sending_receiving.attachments.DatabaseAttachment
import org.session.libsession.messaging.sending_receiving.attachments.Attachment as SignalAttachment
import org.session.libsignal.utilities.logging.Log
import org.session.libsignal.service.internal.push.SignalServiceProtos
@@ -46,6 +48,14 @@ class VisibleMessage : Message() {
}
}
fun addSignalAttachments(signalAttachments: List<SignalAttachment>) {
val attachmentIDs = signalAttachments.map {
val databaseAttachment = it as DatabaseAttachment
databaseAttachment.attachmentId.rowId
}
this.attachmentIDs = attachmentIDs as ArrayList<Long>
}
fun isMediaMessage(): Boolean {
return attachmentIDs.isNotEmpty() || quote != null || linkPreview != null || contact != null
}

View File

@@ -5,27 +5,31 @@ import nl.komponents.kovenant.Promise
import nl.komponents.kovenant.deferred
import org.session.libsession.messaging.MessagingConfiguration
import org.session.libsession.messaging.jobs.JobQueue
import org.session.libsession.messaging.jobs.MessageSendJob
import org.session.libsession.messaging.jobs.NotifyPNServerJob
import org.session.libsession.messaging.messages.Destination
import org.session.libsession.messaging.messages.Message
import org.session.libsession.messaging.messages.control.ClosedGroupControlMessage
import org.session.libsession.messaging.messages.control.ConfigurationMessage
import org.session.libsession.messaging.messages.visible.Attachment
import org.session.libsession.messaging.messages.visible.Profile
import org.session.libsession.messaging.messages.visible.VisibleMessage
import org.session.libsession.messaging.messages.control.ExpirationTimerUpdate
import org.session.libsession.messaging.messages.visible.*
import org.session.libsession.messaging.sending_receiving.attachments.Attachment as SignalAttachment
import org.session.libsession.messaging.sending_receiving.linkpreview.LinkPreview as SignalLinkPreview
import org.session.libsession.messaging.sending_receiving.quotes.QuoteModel as SignalQuote
import org.session.libsession.messaging.opengroups.OpenGroupAPI
import org.session.libsession.messaging.opengroups.OpenGroupMessage
import org.session.libsession.messaging.threads.Address
import org.session.libsession.messaging.utilities.MessageWrapper
import org.session.libsession.snode.RawResponsePromise
import org.session.libsession.snode.SnodeAPI
import org.session.libsession.snode.SnodeConfiguration
import org.session.libsession.snode.SnodeMessage
import org.session.libsession.utilities.SSKEnvironment
import org.session.libsignal.utilities.logging.Log
import org.session.libsignal.service.api.messages.SignalServiceAttachment
import org.session.libsignal.service.internal.push.SignalServiceProtos
import org.session.libsignal.utilities.Base64
import org.session.libsignal.service.loki.api.crypto.ProofOfWork
import org.session.libsignal.service.loki.utilities.hexEncodedPublicKey
import org.session.libsignal.utilities.Base64
import org.session.libsignal.utilities.logging.Log
object MessageSender {
@@ -56,25 +60,22 @@ object MessageSender {
}
// Preparation
fun prep(signalAttachments: List<SignalServiceAttachment>, message: VisibleMessage) {
// TODO: Deal with SignalServiceAttachmentStream
fun prep(signalAttachments: List<SignalAttachment>, message: VisibleMessage) {
val attachments = mutableListOf<Attachment>()
for (signalAttachment in signalAttachments) {
val attachment = Attachment()
if (signalAttachment.isPointer) {
val signalAttachmentPointer = signalAttachment.asPointer()
attachment.fileName = signalAttachmentPointer.fileName.orNull()
attachment.caption = signalAttachmentPointer.caption.orNull()
attachment.contentType = signalAttachmentPointer.contentType
attachment.digest = signalAttachmentPointer.digest.orNull()
attachment.key = signalAttachmentPointer.key
attachment.sizeInBytes = signalAttachmentPointer.size.orNull()
attachment.url = signalAttachmentPointer.url
attachment.size = Size(signalAttachmentPointer.width, signalAttachmentPointer.height)
attachments.add(attachment)
}
attachment.fileName = signalAttachment.fileName
attachment.caption = signalAttachment.caption
attachment.contentType = signalAttachment.contentType
attachment.digest = signalAttachment.digest
attachment.key = Base64.decode(signalAttachment.key)
attachment.sizeInBytes = signalAttachment.size.toInt()
attachment.url = signalAttachment.url
attachment.size = Size(signalAttachment.width, signalAttachment.height)
attachments.add(attachment)
}
val attachmentIDs = MessagingConfiguration.shared.storage.persistAttachments(message.id ?: 0, attachments)
val attachmentIDs = MessagingConfiguration.shared.storage.persistAttachments(message.id
?: 0, attachments)
message.attachmentIDs.addAll(attachmentIDs)
}
@@ -170,7 +171,7 @@ object MessageSender {
val wrappedMessage = MessageWrapper.wrap(kind, message.sentTimestamp!!, senderPublicKey, ciphertext)
// Calculate proof of work
if (destination is Destination.Contact && message is VisibleMessage && !isSelfSend) {
//TODO Notify user for proof of work calculating
SnodeConfiguration.shared.broadcaster.broadcast("calculatingPoW", message.sentTimestamp!!)
}
val recipient = message.recipient!!
val base64EncodedData = Base64.encodeBytes(wrappedMessage)
@@ -178,6 +179,9 @@ object MessageSender {
val nonce = ProofOfWork.calculate(base64EncodedData, recipient, timestamp, message.ttl.toInt()) ?: throw Error.ProofOfWorkCalculationFailed
// Send the result
snodeMessage = SnodeMessage(recipient, base64EncodedData, message.ttl, timestamp, nonce)
if (destination is Destination.Contact && message is VisibleMessage && !isSelfSend) {
SnodeConfiguration.shared.broadcaster.broadcast("sendingMessage", message.sentTimestamp!!)
}
SnodeAPI.sendMessage(snodeMessage).success { promises: Set<RawResponsePromise> ->
var isSuccess = false
val promiseCount = promises.size
@@ -187,7 +191,7 @@ object MessageSender {
if (isSuccess) { return@success } // Succeed as soon as the first promise succeeds
isSuccess = true
if (destination is Destination.Contact && message is VisibleMessage && !isSelfSend) {
//TODO Notify user for message sent
SnodeConfiguration.shared.broadcaster.broadcast("messageSent", message.sentTimestamp!!)
}
handleSuccessfulMessageSend(message, destination, isSyncMessage)
var shouldNotify = (message is VisibleMessage && !isSyncMessage)
@@ -199,7 +203,6 @@ object MessageSender {
JobQueue.shared.add(notifyPNServerJob)
deferred.resolve(Unit)
}
}
promise.fail {
errorCount += 1
@@ -277,17 +280,19 @@ object MessageSender {
storage.setOpenGroupServerMessageID(messageId, message.openGroupServerMessageID!!)
}
// Mark the message as sent
storage.markAsSent(messageId)
storage.markUnidentified(messageId)
storage.markAsSent(message.sentTimestamp!!, message.sender!!)
storage.markUnidentified(message.sentTimestamp!!, message.sender!!)
// Start the disappearing messages timer if needed
SSKEnvironment.shared.messageExpirationManager.startAnyExpiration(messageId)
SSKEnvironment.shared.messageExpirationManager.startAnyExpiration(message.sentTimestamp!!, message.sender!!)
// Sync the message if:
// • it's a visible message
// • the destination was a contact
// • we didn't sync it already
val userPublicKey = storage.getUserPublicKey()!!
if (destination is Destination.Contact && !isSyncMessage && message is VisibleMessage) {
sendToSnodeDestination(Destination.Contact(userPublicKey), message, true).get()
if (destination is Destination.Contact && !isSyncMessage) {
if (message is VisibleMessage) { message.syncTarget = destination.publicKey }
if (message is ExpirationTimerUpdate) { message.syncTarget = destination.publicKey }
sendToSnodeDestination(Destination.Contact(userPublicKey), message, true)
}
}
@@ -295,5 +300,36 @@ object MessageSender {
val storage = MessagingConfiguration.shared.storage
val messageId = storage.getMessageIdInDatabase(message.sentTimestamp!!, message.sender!!) ?: return
storage.setErrorMessage(messageId, error)
SnodeConfiguration.shared.broadcaster.broadcast("messageFailed", message.sentTimestamp!!)
}
// Convenience
@JvmStatic
fun send(message: VisibleMessage, address: Address, attachments: List<SignalAttachment>, quote: SignalQuote?, linkPreview: SignalLinkPreview?) {
prep(attachments, message)
message.quote = Quote.from(quote)
message.linkPreview = LinkPreview.from(linkPreview)
send(message, address)
}
@JvmStatic
fun send(message: Message, address: Address) {
val threadID = MessagingConfiguration.shared.storage.getOrCreateThreadIdFor(address)
message.threadID = threadID
val destination = Destination.from(address)
val job = MessageSendJob(message, destination)
JobQueue.shared.add(job)
}
fun sendNonDurably(message: VisibleMessage, attachments: List<SignalAttachment>, address: Address): Promise<Unit, Exception> {
prep(attachments, message)
return sendNonDurably(message, address)
}
fun sendNonDurably(message: Message, address: Address): Promise<Unit, Exception> {
val threadID = MessagingConfiguration.shared.storage.getOrCreateThreadIdFor(address)
message.threadID = threadID
val destination = Destination.from(address)
return send(message, destination)
}
}

View File

@@ -1,38 +0,0 @@
package org.session.libsession.messaging.sending_receiving
import nl.komponents.kovenant.Promise
import org.session.libsession.messaging.MessagingConfiguration
import org.session.libsession.messaging.jobs.JobQueue
import org.session.libsession.messaging.jobs.MessageSendJob
import org.session.libsession.messaging.messages.Destination
import org.session.libsession.messaging.messages.Message
import org.session.libsession.messaging.messages.visible.VisibleMessage
import org.session.libsession.messaging.threads.Address
import org.session.libsignal.service.api.messages.SignalServiceAttachment
fun MessageSender.send(message: VisibleMessage, attachments: List<SignalServiceAttachment>, address: Address) {
prep(attachments, message)
send(message, address)
}
fun MessageSender.send(message: Message, address: Address) {
val threadID = MessagingConfiguration.shared.storage.getOrCreateThreadIdFor(address)
message.threadID = threadID
val destination = Destination.from(address)
val job = MessageSendJob(message, destination)
JobQueue.shared.add(job)
}
fun MessageSender.sendNonDurably(message: VisibleMessage, attachments: List<SignalServiceAttachment>, address: Address): Promise<Unit, Exception> {
prep(attachments, message)
return sendNonDurably(message, address)
}
fun MessageSender.sendNonDurably(message: Message, address: Address): Promise<Unit, Exception> {
val threadID = MessagingConfiguration.shared.storage.getOrCreateThreadIdFor(address)
message.threadID = threadID
val destination = Destination.from(address)
return MessageSender.send(message, destination)
}

View File

@@ -12,10 +12,10 @@ import org.session.libsession.utilities.Util;
public class AttachmentId implements Parcelable {
@JsonProperty
private final long rowId;
private final long rowId; // This is the field id in the database
@JsonProperty
private final long uniqueId;
private final long uniqueId; // This is the timestamp when the attachment is written into the database
public AttachmentId(@JsonProperty("rowId") long rowId, @JsonProperty("uniqueId") long uniqueId) {
this.rowId = rowId;

View File

@@ -7,10 +7,10 @@ import org.session.libsession.messaging.MessagingConfiguration
import org.session.libsession.messaging.jobs.JobQueue
import org.session.libsession.messaging.jobs.MessageReceiveJob
import org.session.libsession.messaging.utilities.MessageWrapper
import org.session.libsession.snode.Snode
import org.session.libsession.snode.SnodeAPI
import org.session.libsession.snode.SnodeConfiguration
import org.session.libsignal.service.loki.api.Snode
import org.session.libsignal.utilities.logging.Log
import org.session.libsignal.utilities.Base64

View File

@@ -0,0 +1,9 @@
package org.session.libsession.messaging.threads
object DistributionTypes {
const val DEFAULT = 2
const val BROADCAST = 1
const val CONVERSATION = 2
const val ARCHIVE = 3
const val INBOX_ZERO = 4
}

View File

@@ -10,7 +10,7 @@ import org.session.libsession.utilities.AESGCM
import org.session.libsignal.utilities.logging.Log
import org.session.libsignal.utilities.Base64
import org.session.libsignal.utilities.*
import org.session.libsignal.service.loki.api.*
import org.session.libsignal.service.loki.api.Snode
import org.session.libsignal.service.loki.api.fileserver.FileServerAPI
import org.session.libsignal.service.loki.api.utilities.*
import org.session.libsession.utilities.AESGCM.EncryptionResult
@@ -74,7 +74,7 @@ object OnionRequestAPI {
)
internal sealed class Destination {
class Snode(val snode: org.session.libsession.snode.Snode) : Destination()
class Snode(val snode: org.session.libsignal.service.loki.api.Snode) : Destination()
class Server(val host: String, val target: String, val x25519PublicKey: String) : Destination()
}

View File

@@ -1,6 +1,6 @@
package org.session.libsession.snode
public class Snode(val address: String, val port: Int, val publicKeySet: KeySet?) {
class Snode(val address: String, val port: Int, val publicKeySet: KeySet?) {
val ip: String get() = address.removePrefix("https://")

View File

@@ -10,6 +10,9 @@ import org.session.libsession.snode.utilities.getRandomElement
import org.session.libsignal.utilities.logging.Log
import org.session.libsignal.service.loki.api.utilities.HTTP
import org.session.libsignal.service.loki.api.Snode
import org.session.libsignal.service.loki.database.LokiAPIDatabaseProtocol
import org.session.libsignal.service.loki.utilities.Broadcaster
import org.session.libsignal.service.loki.utilities.prettifiedDescription
import org.session.libsignal.service.loki.utilities.retryIfNeeded
import org.session.libsignal.utilities.*
@@ -17,10 +20,11 @@ import org.session.libsignal.utilities.*
import java.security.SecureRandom
object SnodeAPI {
val database = SnodeConfiguration.shared.storage
val broadcaster = SnodeConfiguration.shared.broadcaster
val database: LokiAPIDatabaseProtocol
get() = SnodeConfiguration.shared.storage
val broadcaster: Broadcaster
get() = SnodeConfiguration.shared.broadcaster
val sharedContext = Kovenant.createContext()
val messageSendingContext = Kovenant.createContext()
val messagePollingContext = Kovenant.createContext()
internal var snodeFailureCount: MutableMap<Snode, Int> = mutableMapOf()
@@ -158,7 +162,7 @@ object SnodeAPI {
val parameters = mapOf( "pubKey" to publicKey )
return getRandomSnode().bind {
invoke(Snode.Method.GetSwarm, it, publicKey, parameters)
}.map(SnodeAPI.sharedContext) {
}.map(sharedContext) {
parseSnodes(it).toSet()
}.success {
database.setSwarm(publicKey, it)
@@ -182,19 +186,12 @@ object SnodeAPI {
fun sendMessage(message: SnodeMessage): Promise<Set<RawResponsePromise>, Exception> {
val destination = message.recipient
fun broadcast(event: String) {
val dayInMs: Long = 86400000
if (message.ttl != dayInMs && message.ttl != 4 * dayInMs) { return }
broadcaster.broadcast(event, message.timestamp)
}
broadcast("calculatingPoW")
return retryIfNeeded(maxRetryCount) {
getTargetSnodes(destination).map(messageSendingContext) { swarm ->
getTargetSnodes(destination).map { swarm ->
swarm.map { snode ->
broadcast("sendingMessage")
val parameters = message.toJSON()
retryIfNeeded(maxRetryCount) {
invoke(Snode.Method.SendMessage, snode, destination, parameters).map(messageSendingContext) { rawResponse ->
invoke(Snode.Method.SendMessage, snode, destination, parameters).map { rawResponse ->
val json = rawResponse as? Map<*, *>
val powDifficulty = json?.get("difficulty") as? Int
if (powDifficulty != null) {

View File

@@ -1,12 +1,13 @@
package org.session.libsession.snode
import org.session.libsignal.service.loki.database.LokiAPIDatabaseProtocol
import org.session.libsignal.service.loki.utilities.Broadcaster
class SnodeConfiguration(val storage: SnodeStorageProtocol, val broadcaster: Broadcaster) {
class SnodeConfiguration(val storage: LokiAPIDatabaseProtocol, val broadcaster: Broadcaster) {
companion object {
lateinit var shared: SnodeConfiguration
fun configure(storage: SnodeStorageProtocol, broadcaster: Broadcaster) {
fun configure(storage: LokiAPIDatabaseProtocol, broadcaster: Broadcaster) {
if (Companion::shared.isInitialized) { return }
shared = SnodeConfiguration(storage, broadcaster)
}

View File

@@ -38,7 +38,7 @@ class SSKEnvironment(
interface MessageExpirationManagerProtocol {
fun setExpirationTimer(messageID: Long?, duration: Int, senderPublicKey: String, content: SignalServiceProtos.Content)
fun disableExpirationTimer(messageID: Long?, senderPublicKey: String, content: SignalServiceProtos.Content)
fun startAnyExpiration(messageID: Long)
fun startAnyExpiration(timestamp: Long, author: String)
}
companion object {