mirror of
https://github.com/oxen-io/session-android.git
synced 2025-03-16 06:20:54 +00:00
Refactor PushGroupSendJob
This commit is contained in:
parent
03f2c76344
commit
10865adba8
@ -12,31 +12,26 @@ import org.thoughtcrime.securesms.ApplicationContext;
|
|||||||
import org.thoughtcrime.securesms.attachments.Attachment;
|
import org.thoughtcrime.securesms.attachments.Attachment;
|
||||||
import org.thoughtcrime.securesms.attachments.DatabaseAttachment;
|
import org.thoughtcrime.securesms.attachments.DatabaseAttachment;
|
||||||
import org.thoughtcrime.securesms.crypto.UnidentifiedAccessUtil;
|
import org.thoughtcrime.securesms.crypto.UnidentifiedAccessUtil;
|
||||||
import org.thoughtcrime.securesms.crypto.storage.TextSecureSessionStore;
|
|
||||||
import org.thoughtcrime.securesms.database.Address;
|
import org.thoughtcrime.securesms.database.Address;
|
||||||
import org.thoughtcrime.securesms.database.DatabaseFactory;
|
import org.thoughtcrime.securesms.database.DatabaseFactory;
|
||||||
import org.thoughtcrime.securesms.database.GroupReceiptDatabase.GroupReceiptInfo;
|
|
||||||
import org.thoughtcrime.securesms.database.MmsDatabase;
|
import org.thoughtcrime.securesms.database.MmsDatabase;
|
||||||
import org.thoughtcrime.securesms.database.NoSuchMessageException;
|
import org.thoughtcrime.securesms.database.NoSuchMessageException;
|
||||||
import org.thoughtcrime.securesms.database.documents.IdentityKeyMismatch;
|
import org.thoughtcrime.securesms.database.documents.IdentityKeyMismatch;
|
||||||
import org.thoughtcrime.securesms.database.documents.NetworkFailure;
|
import org.thoughtcrime.securesms.database.documents.NetworkFailure;
|
||||||
import org.thoughtcrime.securesms.dependencies.InjectableType;
|
import org.thoughtcrime.securesms.dependencies.InjectableType;
|
||||||
import org.thoughtcrime.securesms.groups.GroupManager;
|
|
||||||
import org.thoughtcrime.securesms.jobmanager.Data;
|
import org.thoughtcrime.securesms.jobmanager.Data;
|
||||||
import org.thoughtcrime.securesms.jobmanager.Job;
|
import org.thoughtcrime.securesms.jobmanager.Job;
|
||||||
import org.thoughtcrime.securesms.jobmanager.JobManager;
|
import org.thoughtcrime.securesms.jobmanager.JobManager;
|
||||||
import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint;
|
import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint;
|
||||||
import org.thoughtcrime.securesms.logging.Log;
|
import org.thoughtcrime.securesms.logging.Log;
|
||||||
|
import org.thoughtcrime.securesms.loki.protocol.ClosedGroupsProtocol;
|
||||||
import org.thoughtcrime.securesms.mms.MmsException;
|
import org.thoughtcrime.securesms.mms.MmsException;
|
||||||
import org.thoughtcrime.securesms.mms.OutgoingGroupMediaMessage;
|
import org.thoughtcrime.securesms.mms.OutgoingGroupMediaMessage;
|
||||||
import org.thoughtcrime.securesms.mms.OutgoingMediaMessage;
|
import org.thoughtcrime.securesms.mms.OutgoingMediaMessage;
|
||||||
import org.thoughtcrime.securesms.recipients.Recipient;
|
import org.thoughtcrime.securesms.recipients.Recipient;
|
||||||
import org.thoughtcrime.securesms.sms.MessageSender;
|
|
||||||
import org.thoughtcrime.securesms.transport.RetryLaterException;
|
import org.thoughtcrime.securesms.transport.RetryLaterException;
|
||||||
import org.thoughtcrime.securesms.transport.UndeliverableMessageException;
|
import org.thoughtcrime.securesms.transport.UndeliverableMessageException;
|
||||||
import org.thoughtcrime.securesms.util.GroupUtil;
|
import org.thoughtcrime.securesms.util.GroupUtil;
|
||||||
import org.thoughtcrime.securesms.util.TextSecurePreferences;
|
|
||||||
import org.whispersystems.libsignal.SignalProtocolAddress;
|
|
||||||
import org.whispersystems.libsignal.util.guava.Optional;
|
import org.whispersystems.libsignal.util.guava.Optional;
|
||||||
import org.whispersystems.signalservice.api.SignalServiceMessageSender;
|
import org.whispersystems.signalservice.api.SignalServiceMessageSender;
|
||||||
import org.whispersystems.signalservice.api.crypto.UnidentifiedAccessPair;
|
import org.whispersystems.signalservice.api.crypto.UnidentifiedAccessPair;
|
||||||
@ -50,14 +45,10 @@ import org.whispersystems.signalservice.api.messages.SignalServiceGroup;
|
|||||||
import org.whispersystems.signalservice.api.messages.shared.SharedContact;
|
import org.whispersystems.signalservice.api.messages.shared.SharedContact;
|
||||||
import org.whispersystems.signalservice.api.push.SignalServiceAddress;
|
import org.whispersystems.signalservice.api.push.SignalServiceAddress;
|
||||||
import org.whispersystems.signalservice.internal.push.SignalServiceProtos.GroupContext;
|
import org.whispersystems.signalservice.internal.push.SignalServiceProtos.GroupContext;
|
||||||
import org.whispersystems.signalservice.loki.protocol.multidevice.LokiDeviceLinkUtilities;
|
|
||||||
import org.whispersystems.signalservice.loki.api.opengroups.LokiPublicChat;
|
|
||||||
import org.whispersystems.signalservice.loki.utilities.PromiseUtil;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
@ -159,36 +150,13 @@ public class PushGroupSendJob extends PushSendJob implements InjectableType {
|
|||||||
try {
|
try {
|
||||||
log(TAG, "Sending message: " + messageId);
|
log(TAG, "Sending message: " + messageId);
|
||||||
|
|
||||||
List<Address> target;
|
List<Address> targets;
|
||||||
|
|
||||||
if (filterAddress != null) target = Collections.singletonList(Address.fromSerialized(filterAddress));
|
if (filterAddress != null) targets = Collections.singletonList(Address.fromSerialized(filterAddress));
|
||||||
else if (!existingNetworkFailures.isEmpty()) target = Stream.of(existingNetworkFailures).map(NetworkFailure::getAddress).toList();
|
else if (!existingNetworkFailures.isEmpty()) targets = Stream.of(existingNetworkFailures).map(NetworkFailure::getAddress).toList();
|
||||||
else target = getGroupMessageRecipients(message.getRecipient().getAddress().toGroupString(), messageId);
|
else targets = getGroupMessageRecipients(message.getRecipient().getAddress().toGroupString(), messageId);
|
||||||
|
|
||||||
String localNumber = TextSecurePreferences.getLocalNumber(context);
|
List<SendMessageResult> results = deliver(message, targets);
|
||||||
|
|
||||||
// Only send messages to the contacts we have sessions with
|
|
||||||
List<Address> validTargets = Stream.of(target).filter(member -> {
|
|
||||||
if (member.isOpenGroup()) { return true; }
|
|
||||||
|
|
||||||
// Our device is always valid
|
|
||||||
if (member.serialize().equalsIgnoreCase(localNumber)) { return true; }
|
|
||||||
|
|
||||||
SignalProtocolAddress protocolAddress = new SignalProtocolAddress(member.toPhoneString(), SignalServiceAddress.DEFAULT_DEVICE_ID);
|
|
||||||
boolean hasSession = new TextSecureSessionStore(context).containsSession(protocolAddress);
|
|
||||||
if (hasSession) { return true; }
|
|
||||||
|
|
||||||
// We should allow sending if we have a prekeybundle for the contact
|
|
||||||
return DatabaseFactory.getLokiPreKeyBundleDatabase(context).hasPreKeyBundle(member.toPhoneString());
|
|
||||||
}).toList();
|
|
||||||
|
|
||||||
// Send a session request to the other devices
|
|
||||||
List<Address> others = Stream.of(target).filter(t -> !validTargets.contains(t)).toList();
|
|
||||||
for (Address device : others) {
|
|
||||||
MessageSender.sendBackgroundSessionRequest(context, device.toPhoneString());
|
|
||||||
}
|
|
||||||
|
|
||||||
List<SendMessageResult> results = deliver(message, validTargets);
|
|
||||||
List<NetworkFailure> networkFailures = Stream.of(results).filter(SendMessageResult::isNetworkFailure).map(result -> new NetworkFailure(Address.fromSerialized(result.getAddress().getNumber()))).toList();
|
List<NetworkFailure> networkFailures = Stream.of(results).filter(SendMessageResult::isNetworkFailure).map(result -> new NetworkFailure(Address.fromSerialized(result.getAddress().getNumber()))).toList();
|
||||||
List<IdentityKeyMismatch> identityMismatches = Stream.of(results).filter(result -> result.getIdentityFailure() != null).map(result -> new IdentityKeyMismatch(Address.fromSerialized(result.getAddress().getNumber()), result.getIdentityFailure().getIdentityKey())).toList();
|
List<IdentityKeyMismatch> identityMismatches = Stream.of(results).filter(result -> result.getIdentityFailure() != null).map(result -> new IdentityKeyMismatch(Address.fromSerialized(result.getAddress().getNumber()), result.getIdentityFailure().getIdentityKey())).toList();
|
||||||
Set<Address> successAddresses = Stream.of(results).filter(result -> result.getSuccess() != null).map(result -> Address.fromSerialized(result.getAddress().getNumber())).collect(Collectors.toSet());
|
Set<Address> successAddresses = Stream.of(results).filter(result -> result.getSuccess() != null).map(result -> Address.fromSerialized(result.getAddress().getNumber())).collect(Collectors.toSet());
|
||||||
@ -246,10 +214,8 @@ public class PushGroupSendJob extends PushSendJob implements InjectableType {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean onShouldRetry(@NonNull Exception exception) {
|
public boolean onShouldRetry(@NonNull Exception exception) {
|
||||||
if (exception instanceof IOException) return true;
|
if (exception instanceof IOException) return true;
|
||||||
|
|
||||||
// Loki - Disable since we have our own retrying
|
// Loki - Disable since we have our own retrying
|
||||||
// if (exception instanceof RetryLaterException) return true;
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -260,17 +226,16 @@ public class PushGroupSendJob extends PushSendJob implements InjectableType {
|
|||||||
|
|
||||||
private List<SendMessageResult> deliver(OutgoingMediaMessage message, @NonNull List<Address> destinations)
|
private List<SendMessageResult> deliver(OutgoingMediaMessage message, @NonNull List<Address> destinations)
|
||||||
throws IOException, UntrustedIdentityException, UndeliverableMessageException {
|
throws IOException, UntrustedIdentityException, UndeliverableMessageException {
|
||||||
// rotateSenderCertificateIfNecessary();
|
|
||||||
|
|
||||||
// Messages shouldn't be able to be sent to RSS Feeds
|
// Loki - The user shouldn't be able to message RSS feeds
|
||||||
Address groupAddress = message.getRecipient().getAddress();
|
Address address = message.getRecipient().getAddress();
|
||||||
if (groupAddress.isRSSFeed()) {
|
if (address.isRSSFeed()) {
|
||||||
List<SendMessageResult> results = new ArrayList<>();
|
List<SendMessageResult> results = new ArrayList<>();
|
||||||
for (Address destination : destinations) results.add(SendMessageResult.networkFailure(new SignalServiceAddress(destination.toPhoneString())));
|
for (Address destination : destinations) results.add(SendMessageResult.networkFailure(new SignalServiceAddress(destination.toPhoneString())));
|
||||||
return results;
|
return results;
|
||||||
}
|
}
|
||||||
|
|
||||||
String groupId = groupAddress.toGroupString();
|
String groupId = address.toGroupString();
|
||||||
Optional<byte[]> profileKey = getProfileKey(message.getRecipient());
|
Optional<byte[]> profileKey = getProfileKey(message.getRecipient());
|
||||||
Optional<Quote> quote = getQuoteFor(message);
|
Optional<Quote> quote = getQuoteFor(message);
|
||||||
Optional<SignalServiceDataMessage.Sticker> sticker = getStickerFor(message);
|
Optional<SignalServiceDataMessage.Sticker> sticker = getStickerFor(message);
|
||||||
@ -281,18 +246,15 @@ public class PushGroupSendJob extends PushSendJob implements InjectableType {
|
|||||||
List<SignalServiceAttachment> attachmentPointers = getAttachmentPointersFor(attachments);
|
List<SignalServiceAttachment> attachmentPointers = getAttachmentPointersFor(attachments);
|
||||||
|
|
||||||
List<Optional<UnidentifiedAccessPair>> unidentifiedAccess = Stream.of(addresses)
|
List<Optional<UnidentifiedAccessPair>> unidentifiedAccess = Stream.of(addresses)
|
||||||
.map(address -> Address.fromSerialized(address.getNumber()))
|
.map(a -> Address.fromSerialized(a.getNumber()))
|
||||||
.map(address -> Recipient.from(context, address, false))
|
.map(a -> Recipient.from(context, a, false))
|
||||||
.map(recipient -> UnidentifiedAccessUtil.getAccessFor(context, recipient))
|
.map(recipient -> UnidentifiedAccessUtil.getAccessFor(context, recipient))
|
||||||
.toList();
|
.toList();
|
||||||
|
|
||||||
SignalServiceGroup.GroupType groupType = SignalServiceGroup.GroupType.SIGNAL;
|
SignalServiceGroup.GroupType groupType = address.isOpenGroup() ? SignalServiceGroup.GroupType.PUBLIC_CHAT : SignalServiceGroup.GroupType.SIGNAL;
|
||||||
if (groupAddress.isOpenGroup()) {
|
|
||||||
groupType = SignalServiceGroup.GroupType.PUBLIC_CHAT;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (message.isGroup() && groupAddress.isClosedGroup()) {
|
if (message.isGroup() && address.isClosedGroup()) {
|
||||||
// Loki - Only send GroupUpdate or GroupQuit to signal groups
|
// Loki - Only send GroupUpdate or GroupQuit messages to closed groups
|
||||||
OutgoingGroupMediaMessage groupMessage = (OutgoingGroupMediaMessage) message;
|
OutgoingGroupMediaMessage groupMessage = (OutgoingGroupMediaMessage) message;
|
||||||
GroupContext groupContext = groupMessage.getGroupContext();
|
GroupContext groupContext = groupMessage.getGroupContext();
|
||||||
SignalServiceAttachment avatar = attachmentPointers.isEmpty() ? null : attachmentPointers.get(0);
|
SignalServiceAttachment avatar = attachmentPointers.isEmpty() ? null : attachmentPointers.get(0);
|
||||||
@ -327,62 +289,7 @@ public class PushGroupSendJob extends PushSendJob implements InjectableType {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private @NonNull List<Address> getGroupMessageRecipients(String groupId, long messageId) {
|
private @NonNull List<Address> getGroupMessageRecipients(String groupId, long messageId) {
|
||||||
if (GroupUtil.isRSSFeed(groupId)) { return new ArrayList<>(); }
|
return ClosedGroupsProtocol.getDestinations(groupId, context);
|
||||||
|
|
||||||
// Loki - All public chat group messages should be directed to their respective servers
|
|
||||||
if (GroupUtil.isOpenGroup(groupId)) {
|
|
||||||
ArrayList<Address> result = new ArrayList<>();
|
|
||||||
long threadID = GroupManager.getThreadIdFromGroupId(groupId, context);
|
|
||||||
LokiPublicChat publicChat = DatabaseFactory.getLokiThreadDatabase(context).getPublicChat(threadID);
|
|
||||||
if (publicChat != null) {
|
|
||||||
result.add(Address.fromSerialized(groupId));
|
|
||||||
}
|
|
||||||
return result;
|
|
||||||
} else {
|
|
||||||
/*
|
|
||||||
Our biggest assumption here is that group members will only consist of primary devices.
|
|
||||||
No secondary device should be able to be added to a group.
|
|
||||||
*/
|
|
||||||
List<GroupReceiptInfo> destinations = DatabaseFactory.getGroupReceiptDatabase(context).getGroupReceiptInfo(messageId);
|
|
||||||
|
|
||||||
Set<Address> memberSet = new HashSet<>();
|
|
||||||
if (destinations.isEmpty()) {
|
|
||||||
List<Recipient> groupMembers = DatabaseFactory.getGroupDatabase(context).getGroupMembers(groupId, false);
|
|
||||||
memberSet.addAll(Stream.of(groupMembers).map(Recipient::getAddress).toList());
|
|
||||||
} else {
|
|
||||||
memberSet.addAll(Stream.of(destinations).map(GroupReceiptInfo::getAddress).toList());
|
|
||||||
}
|
|
||||||
|
|
||||||
// Replace primary device public key with ours so message syncing works correctly
|
|
||||||
String masterHexEncodedPublicKey = TextSecurePreferences.getMasterHexEncodedPublicKey(context);
|
|
||||||
String localNumber = TextSecurePreferences.getLocalNumber(context);
|
|
||||||
if (masterHexEncodedPublicKey != null && memberSet.contains(Address.fromSerialized(masterHexEncodedPublicKey))) {
|
|
||||||
memberSet.remove(Address.fromSerialized(masterHexEncodedPublicKey));
|
|
||||||
memberSet.add(Address.fromSerialized(localNumber));
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add secondary devices to the list. We shouldn't add our secondary devices
|
|
||||||
try {
|
|
||||||
Set<Address> originalMemberSet = new HashSet<>(memberSet);
|
|
||||||
for (Address member : originalMemberSet) {
|
|
||||||
if (!member.isPhone() || member.serialize().equalsIgnoreCase(localNumber)) { continue; }
|
|
||||||
|
|
||||||
try {
|
|
||||||
Set<String> secondaryDevices = PromiseUtil.timeout(LokiDeviceLinkUtilities.INSTANCE.getSlaveHexEncodedPublicKeys(member.serialize()), 5000).get();
|
|
||||||
memberSet.addAll(Stream.of(secondaryDevices).map(string -> {
|
|
||||||
// Loki - Calling .map(Address::fromSerialized) is causing errors, thus we use the long method :(
|
|
||||||
return Address.fromSerialized(string);
|
|
||||||
}).toList());
|
|
||||||
} catch (Exception e) {
|
|
||||||
// Timed out, go to the next member
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
Log.e("PushGroupSend", "Error occurred while adding secondary devices: " + e);
|
|
||||||
}
|
|
||||||
|
|
||||||
return new LinkedList<>(memberSet);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class Factory implements Job.Factory<PushGroupSendJob> {
|
public static class Factory implements Job.Factory<PushGroupSendJob> {
|
||||||
|
@ -7,9 +7,35 @@ import org.thoughtcrime.securesms.recipients.Recipient
|
|||||||
import org.thoughtcrime.securesms.sms.MessageSender
|
import org.thoughtcrime.securesms.sms.MessageSender
|
||||||
import org.thoughtcrime.securesms.util.GroupUtil
|
import org.thoughtcrime.securesms.util.GroupUtil
|
||||||
import org.thoughtcrime.securesms.util.TextSecurePreferences
|
import org.thoughtcrime.securesms.util.TextSecurePreferences
|
||||||
|
import org.whispersystems.signalservice.loki.protocol.multidevice.MultiDeviceProtocol
|
||||||
|
|
||||||
object ClosedGroupsProtocol {
|
object ClosedGroupsProtocol {
|
||||||
|
|
||||||
|
@JvmStatic
|
||||||
|
fun getDestinations(groupID: String, context: Context): List<Address> {
|
||||||
|
if (GroupUtil.isRSSFeed(groupID)) { return listOf() }
|
||||||
|
if (GroupUtil.isOpenGroup(groupID)) {
|
||||||
|
val result = mutableListOf<Address>()
|
||||||
|
result.add(Address.fromSerialized(groupID))
|
||||||
|
return result
|
||||||
|
} else {
|
||||||
|
val members = DatabaseFactory.getGroupDatabase(context).getGroupMembers(groupID, false)
|
||||||
|
val recipients = members.flatMap { member ->
|
||||||
|
MultiDeviceProtocol.shared.getAllLinkedDevices(member.address.serialize()).map { Address.fromSerialized(it) }
|
||||||
|
}.toMutableSet()
|
||||||
|
val masterPublicKey = TextSecurePreferences.getMasterHexEncodedPublicKey(context)
|
||||||
|
if (masterPublicKey != null && recipients.contains(Address.fromSerialized(masterPublicKey))) {
|
||||||
|
recipients.remove(Address.fromSerialized(masterPublicKey))
|
||||||
|
}
|
||||||
|
val userPublicKey = TextSecurePreferences.getLocalNumber(context)
|
||||||
|
if (userPublicKey != null && recipients.contains(Address.fromSerialized(userPublicKey))) {
|
||||||
|
recipients.remove(Address.fromSerialized(userPublicKey))
|
||||||
|
}
|
||||||
|
return recipients.toList()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@JvmStatic
|
||||||
fun leaveGroup(context: Context, recipient: Recipient): Boolean {
|
fun leaveGroup(context: Context, recipient: Recipient): Boolean {
|
||||||
if (!recipient.address.isClosedGroup) { return true }
|
if (!recipient.address.isClosedGroup) { return true }
|
||||||
val threadID = DatabaseFactory.getThreadDatabase(context).getThreadIdFor(recipient)
|
val threadID = DatabaseFactory.getThreadDatabase(context).getThreadIdFor(recipient)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user