Groups V2 state comparison and gap handling.

This commit is contained in:
Alan Evans
2020-07-08 18:10:58 -03:00
committed by Greyson Parrelli
parent c9d2cef58d
commit 9ac9ace6b8
17 changed files with 990 additions and 113 deletions

View File

@@ -9,17 +9,17 @@ import java.util.Collection;
*/
final class AdvanceGroupStateResult {
@NonNull private final Collection<GroupLogEntry> processedLogEntries;
@NonNull private final GlobalGroupState newGlobalGroupState;
@NonNull private final Collection<LocalGroupLogEntry> processedLogEntries;
@NonNull private final GlobalGroupState newGlobalGroupState;
AdvanceGroupStateResult(@NonNull Collection<GroupLogEntry> processedLogEntries,
AdvanceGroupStateResult(@NonNull Collection<LocalGroupLogEntry> processedLogEntries,
@NonNull GlobalGroupState newGlobalGroupState)
{
this.processedLogEntries = processedLogEntries;
this.newGlobalGroupState = newGlobalGroupState;
}
@NonNull Collection<GroupLogEntry> getProcessedLogEntries() {
@NonNull Collection<LocalGroupLogEntry> getProcessedLogEntries() {
return processedLogEntries;
}

View File

@@ -13,31 +13,42 @@ import java.util.List;
*/
final class GlobalGroupState {
@Nullable private final DecryptedGroup localState;
@NonNull private final List<GroupLogEntry> history;
@Nullable private final DecryptedGroup localState;
@NonNull private final List<ServerGroupLogEntry> serverHistory;
GlobalGroupState(@Nullable DecryptedGroup localState,
@NonNull List<GroupLogEntry> serverStates)
@NonNull List<ServerGroupLogEntry> serverHistory)
{
this.localState = localState;
this.history = serverStates;
this.localState = localState;
this.serverHistory = serverHistory;
}
@Nullable DecryptedGroup getLocalState() {
return localState;
}
@NonNull Collection<GroupLogEntry> getHistory() {
return history;
@NonNull Collection<ServerGroupLogEntry> getServerHistory() {
return serverHistory;
}
int getEarliestRevisionNumber() {
if (localState != null) {
return localState.getRevision();
} else {
if (serverHistory.isEmpty()) {
throw new AssertionError();
}
return serverHistory.get(0).getRevision();
}
}
int getLatestRevisionNumber() {
if (history.isEmpty()) {
if (serverHistory.isEmpty()) {
if (localState == null) {
throw new AssertionError();
}
return localState.getRevision();
}
return history.get(history.size() - 1).getGroup().getRevision();
return serverHistory.get(serverHistory.size() - 1).getRevision();
}
}

View File

@@ -3,16 +3,24 @@ package org.thoughtcrime.securesms.groups.v2.processing;
import androidx.annotation.NonNull;
import org.signal.storageservice.protos.groups.local.DecryptedGroup;
import org.signal.storageservice.protos.groups.local.DecryptedGroupChange;
import org.thoughtcrime.securesms.logging.Log;
import org.whispersystems.signalservice.api.groupsv2.DecryptedGroupUtil;
import org.whispersystems.signalservice.api.groupsv2.GroupChangeReconstruct;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Objects;
final class GroupStateMapper {
private static final String TAG = Log.tag(GroupStateMapper.class);
static final int LATEST = Integer.MAX_VALUE;
private static final Comparator<GroupLogEntry> BY_REVISION = (o1, o2) -> Integer.compare(o1.getGroup().getRevision(), o2.getGroup().getRevision());
private static final Comparator<ServerGroupLogEntry> BY_REVISION = (o1, o2) -> Integer.compare(o1.getRevision(), o2.getRevision());
private GroupStateMapper() {
}
@@ -27,37 +35,88 @@ final class GroupStateMapper {
static @NonNull AdvanceGroupStateResult partiallyAdvanceGroupState(@NonNull GlobalGroupState inputState,
int maximumRevisionToApply)
{
final ArrayList<GroupLogEntry> statesToApplyNow = new ArrayList<>(inputState.getHistory().size());
final ArrayList<GroupLogEntry> statesToApplyLater = new ArrayList<>(inputState.getHistory().size());
final DecryptedGroup newLocalState;
final GlobalGroupState newGlobalGroupState;
ArrayList<LocalGroupLogEntry> appliedChanges = new ArrayList<>(inputState.getServerHistory().size());
HashMap<Integer, ServerGroupLogEntry> statesToApplyNow = new HashMap<>(inputState.getServerHistory().size());
ArrayList<ServerGroupLogEntry> statesToApplyLater = new ArrayList<>(inputState.getServerHistory().size());
DecryptedGroup current = inputState.getLocalState();
for (GroupLogEntry entry : inputState.getHistory()) {
if (inputState.getLocalState() != null &&
inputState.getLocalState().getRevision() >= entry.getGroup().getRevision())
{
if (inputState.getServerHistory().isEmpty()) {
return new AdvanceGroupStateResult(Collections.emptyList(), new GlobalGroupState(current, Collections.emptyList()));
}
for (ServerGroupLogEntry entry : inputState.getServerHistory()) {
if (entry.getRevision() > maximumRevisionToApply) {
statesToApplyLater.add(entry);
} else {
statesToApplyNow.put(entry.getRevision(), entry);
}
}
Collections.sort(statesToApplyLater, BY_REVISION);
final int from = inputState.getEarliestRevisionNumber();
final int to = Math.min(inputState.getLatestRevisionNumber(), maximumRevisionToApply);
for (int revision = from; revision >= 0 && revision <= to; revision++) {
ServerGroupLogEntry entry = statesToApplyNow.get(revision);
if (entry == null) {
Log.w(TAG, "Could not find group log on server V" + revision);
continue;
}
if (entry.getGroup().getRevision() > maximumRevisionToApply) {
statesToApplyLater.add(entry);
} else {
statesToApplyNow.add(entry);
DecryptedGroup groupAtRevision = entry.getGroup();
DecryptedGroupChange changeAtRevision = entry.getChange();
if (current == null) {
Log.w(TAG, "No local state, accepting server state for V" + revision);
current = groupAtRevision;
if (groupAtRevision != null) {
appliedChanges.add(new LocalGroupLogEntry(groupAtRevision, changeAtRevision));
}
continue;
}
if (current.getRevision() + 1 != revision) {
Log.w(TAG, "Detected gap V" + revision);
}
if (changeAtRevision == null) {
Log.w(TAG, "Reconstructing change for V" + revision);
changeAtRevision = GroupChangeReconstruct.reconstructGroupChange(current, Objects.requireNonNull(groupAtRevision));
}
DecryptedGroup groupWithChangeApplied;
try {
groupWithChangeApplied = DecryptedGroupUtil.applyWithoutRevisionCheck(current, changeAtRevision);
} catch (DecryptedGroupUtil.NotAbleToApplyChangeException e) {
Log.w(TAG, "Unable to apply V" + revision, e);
continue;
}
if (groupAtRevision == null) {
Log.w(TAG, "Reconstructing state for V" + revision);
groupAtRevision = groupWithChangeApplied;
}
if (current.getRevision() != groupAtRevision.getRevision()) {
appliedChanges.add(new LocalGroupLogEntry(groupAtRevision, changeAtRevision));
} else {
DecryptedGroupChange sameRevisionDelta = GroupChangeReconstruct.reconstructGroupChange(current, groupAtRevision);
if (!DecryptedGroupUtil.changeIsEmpty(sameRevisionDelta)) {
appliedChanges.add(new LocalGroupLogEntry(groupAtRevision, sameRevisionDelta));
Log.w(TAG, "Inserted repair change for mismatch V" + revision);
}
}
DecryptedGroupChange missingChanges = GroupChangeReconstruct.reconstructGroupChange(groupWithChangeApplied, groupAtRevision);
if (!DecryptedGroupUtil.changeIsEmpty(missingChanges)) {
appliedChanges.add(new LocalGroupLogEntry(groupAtRevision, missingChanges));
Log.w(TAG, "Inserted repair change for gap V" + revision);
}
current = groupAtRevision;
}
Collections.sort(statesToApplyNow, BY_REVISION);
Collections.sort(statesToApplyLater, BY_REVISION);
if (statesToApplyNow.size() > 0) {
newLocalState = statesToApplyNow.get(statesToApplyNow.size() - 1)
.getGroup();
} else {
newLocalState = inputState.getLocalState();
}
newGlobalGroupState = new GlobalGroupState(newLocalState, statesToApplyLater);
return new AdvanceGroupStateResult(statesToApplyNow, newGlobalGroupState);
return new AdvanceGroupStateResult(appliedChanges, new GlobalGroupState(current, statesToApplyLater));
}
}

View File

@@ -28,6 +28,7 @@ import org.thoughtcrime.securesms.groups.v2.ProfileKeySet;
import org.thoughtcrime.securesms.jobmanager.JobManager;
import org.thoughtcrime.securesms.jobs.AvatarGroupsV2DownloadJob;
import org.thoughtcrime.securesms.jobs.RetrieveProfileJob;
import org.thoughtcrime.securesms.keyvalue.SignalStore;
import org.thoughtcrime.securesms.logging.Log;
import org.thoughtcrime.securesms.mms.MmsException;
import org.thoughtcrime.securesms.mms.OutgoingGroupUpdateMessage;
@@ -99,7 +100,7 @@ public final class GroupsV2StateProcessor {
public static class GroupUpdateResult {
private final GroupState groupState;
@Nullable private DecryptedGroup latestServer;
@Nullable private final DecryptedGroup latestServer;
GroupUpdateResult(@NonNull GroupState groupState, @Nullable DecryptedGroup latestServer) {
this.groupState = groupState;
@@ -152,13 +153,17 @@ public final class GroupsV2StateProcessor {
localState.getRevision() + 1 == signedGroupChange.getRevision() &&
revision == signedGroupChange.getRevision())
{
try {
Log.i(TAG, "Applying P2P group change");
DecryptedGroup newState = DecryptedGroupUtil.apply(localState, signedGroupChange);
if (SignalStore.internalValues().gv2IgnoreP2PChanges()) {
Log.w(TAG, "Ignoring P2P group change by setting");
} else {
try {
Log.i(TAG, "Applying P2P group change");
DecryptedGroup newState = DecryptedGroupUtil.apply(localState, signedGroupChange);
inputGroupState = new GlobalGroupState(localState, Collections.singletonList(new GroupLogEntry(newState, signedGroupChange)));
} catch (DecryptedGroupUtil.NotAbleToApplyChangeException e) {
Log.w(TAG, "Unable to apply P2P group change", e);
inputGroupState = new GlobalGroupState(localState, Collections.singletonList(new ServerGroupLogEntry(newState, signedGroupChange)));
} catch (DecryptedGroupUtil.NotAbleToApplyChangeException e) {
Log.w(TAG, "Unable to apply P2P group change", e);
}
}
}
@@ -186,7 +191,7 @@ public final class GroupsV2StateProcessor {
persistLearnedProfileKeys(inputGroupState);
GlobalGroupState remainingWork = advanceGroupStateResult.getNewGlobalGroupState();
if (remainingWork.getHistory().size() > 0) {
if (remainingWork.getServerHistory().size() > 0) {
Log.i(TAG, String.format(Locale.US, "There are more revisions on the server for this group, not applying at this time, V[%d..%d]", newLocalState.getRevision() + 1, remainingWork.getLatestRevisionNumber()));
}
@@ -270,9 +275,9 @@ public final class GroupsV2StateProcessor {
}
}
private void insertUpdateMessages(long timestamp, Collection<GroupLogEntry> processedLogEntries) {
for (GroupLogEntry entry : processedLogEntries) {
if (entry.getChange() != null && DecryptedGroupUtil.changeIsEmptyExceptForProfileKeyChanges(entry.getChange())) {
private void insertUpdateMessages(long timestamp, Collection<LocalGroupLogEntry> processedLogEntries) {
for (LocalGroupLogEntry entry : processedLogEntries) {
if (entry.getChange() != null && DecryptedGroupUtil.changeIsEmptyExceptForProfileKeyChanges(entry.getChange()) && !DecryptedGroupUtil.changeIsEmpty(entry.getChange())) {
Log.d(TAG, "Skipping profile key changes only update message");
} else {
storeMessage(GroupProtoUtil.createDecryptedGroupV2Context(masterKey, entry.getGroup(), entry.getChange(), null), timestamp);
@@ -283,9 +288,9 @@ public final class GroupsV2StateProcessor {
private void persistLearnedProfileKeys(@NonNull GlobalGroupState globalGroupState) {
final ProfileKeySet profileKeys = new ProfileKeySet();
for (GroupLogEntry entry : globalGroupState.getHistory()) {
for (ServerGroupLogEntry entry : globalGroupState.getServerHistory()) {
Optional<UUID> editor = DecryptedGroupUtil.editorUuid(entry.getChange());
if (editor.isPresent()) {
if (editor.isPresent() && entry.getGroup() != null) {
profileKeys.addKeysFromGroupState(entry.getGroup(), editor.get());
}
}
@@ -301,9 +306,9 @@ public final class GroupsV2StateProcessor {
private @NonNull GlobalGroupState queryServer(@Nullable DecryptedGroup localState, boolean latestOnly)
throws IOException, GroupNotAMemberException
{
DecryptedGroup latestServerGroup;
List<GroupLogEntry> history;
UUID selfUuid = Recipient.self().getUuid().get();
UUID selfUuid = Recipient.self().getUuid().get();
DecryptedGroup latestServerGroup;
List<ServerGroupLogEntry> history;
try {
latestServerGroup = groupsV2Api.getGroup(groupSecretParams, groupsV2Authorization.getAuthorizationForToday(selfUuid, groupSecretParams));
@@ -314,7 +319,7 @@ public final class GroupsV2StateProcessor {
}
if (latestOnly || !GroupProtoUtil.isMember(selfUuid, latestServerGroup.getMembersList())) {
history = Collections.singletonList(new GroupLogEntry(latestServerGroup, null));
history = Collections.singletonList(new ServerGroupLogEntry(latestServerGroup, null));
} else {
int revisionWeWereAdded = GroupProtoUtil.findRevisionWeWereAdded(latestServerGroup, selfUuid);
int logsNeededFrom = localState != null ? Math.max(localState.getRevision(), revisionWeWereAdded) : revisionWeWereAdded;
@@ -325,13 +330,18 @@ public final class GroupsV2StateProcessor {
return new GlobalGroupState(localState, history);
}
private List<GroupLogEntry> getFullMemberHistory(@NonNull UUID selfUuid, int logsNeededFromRevision) throws IOException {
private List<ServerGroupLogEntry> getFullMemberHistory(@NonNull UUID selfUuid, int logsNeededFromRevision) throws IOException {
try {
Collection<DecryptedGroupHistoryEntry> groupStatesFromRevision = groupsV2Api.getGroupHistory(groupSecretParams, logsNeededFromRevision, groupsV2Authorization.getAuthorizationForToday(selfUuid, groupSecretParams));
ArrayList<GroupLogEntry> history = new ArrayList<>(groupStatesFromRevision.size());
ArrayList<ServerGroupLogEntry> history = new ArrayList<>(groupStatesFromRevision.size());
boolean ignoreServerChanges = SignalStore.internalValues().gv2IgnoreServerChanges();
if (ignoreServerChanges) {
Log.w(TAG, "Server change logs are ignored by setting");
}
for (DecryptedGroupHistoryEntry entry : groupStatesFromRevision) {
history.add(new GroupLogEntry(entry.getGroup(), entry.getChange()));
history.add(new ServerGroupLogEntry(entry.getGroup(), ignoreServerChanges ? null : entry.getChange()));
}
return history;
@@ -343,12 +353,7 @@ public final class GroupsV2StateProcessor {
private void storeMessage(@NonNull DecryptedGroupV2Context decryptedGroupV2Context, long timestamp) {
Optional<UUID> editor = getEditor(decryptedGroupV2Context);
if (!editor.isPresent() || UuidUtil.UNKNOWN_UUID.equals(editor.get())) {
Log.w(TAG, "Cannot determine editor of change, can't insert message");
return;
}
boolean outgoing = Recipient.self().requireUuid().equals(editor.get());
boolean outgoing = !editor.isPresent() || Recipient.self().requireUuid().equals(editor.get());
if (outgoing) {
try {

View File

@@ -6,17 +6,21 @@ import androidx.annotation.Nullable;
import org.signal.storageservice.protos.groups.local.DecryptedGroup;
import org.signal.storageservice.protos.groups.local.DecryptedGroupChange;
import java.util.Objects;
/**
* Pair of a group state and optionally the corresponding change.
* <p>
* Similar to {@link ServerGroupLogEntry} but guaranteed to have a group state.
* <p>
* Changes are typically not available for pending members.
*/
final class GroupLogEntry {
final class LocalGroupLogEntry {
@NonNull private final DecryptedGroup group;
@Nullable private final DecryptedGroupChange change;
GroupLogEntry(@NonNull DecryptedGroup group, @Nullable DecryptedGroupChange change) {
LocalGroupLogEntry(@NonNull DecryptedGroup group, @Nullable DecryptedGroupChange change) {
if (change != null && group.getRevision() != change.getRevision()) {
throw new AssertionError();
}
@@ -32,4 +36,20 @@ final class GroupLogEntry {
@Nullable DecryptedGroupChange getChange() {
return change;
}
@Override
public boolean equals(Object o) {
if (!(o instanceof LocalGroupLogEntry)) return false;
LocalGroupLogEntry other = (LocalGroupLogEntry) o;
return group.equals(other.group) && Objects.equals(change, other.change);
}
@Override
public int hashCode() {
int result = group.hashCode();
result = 31 * result + (change != null ? change.hashCode() : 0);
return result;
}
}

View File

@@ -0,0 +1,50 @@
package org.thoughtcrime.securesms.groups.v2.processing;
import androidx.annotation.Nullable;
import org.signal.storageservice.protos.groups.local.DecryptedGroup;
import org.signal.storageservice.protos.groups.local.DecryptedGroupChange;
import org.thoughtcrime.securesms.logging.Log;
/**
* Pair of a group state and optionally the corresponding change from the server.
* <p>
* Either the group or change may be empty.
* <p>
* Changes are typically not available for pending members.
*/
final class ServerGroupLogEntry {
private static final String TAG = Log.tag(ServerGroupLogEntry.class);
@Nullable private final DecryptedGroup group;
@Nullable private final DecryptedGroupChange change;
ServerGroupLogEntry(@Nullable DecryptedGroup group, @Nullable DecryptedGroupChange change) {
if (change != null && group != null && group.getRevision() != change.getRevision()) {
Log.w(TAG, "Ignoring change with revision number not matching group");
change = null;
}
if (change == null && group == null) {
throw new AssertionError();
}
this.group = group;
this.change = change;
}
@Nullable DecryptedGroup getGroup() {
return group;
}
@Nullable DecryptedGroupChange getChange() {
return change;
}
int getRevision() {
if (group != null) return group.getRevision();
else if (change != null) return change.getRevision();
else throw new AssertionError();
}
}

View File

@@ -4,7 +4,9 @@ import org.thoughtcrime.securesms.util.FeatureFlags;
public final class InternalValues extends SignalStoreValues {
public static final String GV2_FORCE_INVITES = "internal.gv2.force_invites";
public static final String GV2_FORCE_INVITES = "internal.gv2.force_invites";
public static final String GV2_IGNORE_SERVER_CHANGES = "internal.gv2.ignore_server_changes";
public static final String GV2_IGNORE_P2P_CHANGES = "internal.gv2.ignore_p2p_changes";
InternalValues(KeyValueStore store) {
super(store);
@@ -17,4 +19,25 @@ public final class InternalValues extends SignalStoreValues {
public synchronized boolean forceGv2Invites() {
return FeatureFlags.internalUser() && getBoolean(GV2_FORCE_INVITES, false);
}
/**
* The Server will leave out changes that can only be described by a future protocol level that
* an older client cannot understand. Ignoring those changes by nulling them out simulates that
* scenario for testing.
* <p>
* In conjunction with {@link #gv2IgnoreP2PChanges()} it means no group changes are coming into
* the client and it will generate changes by group state comparison, and those changes will not
* have an editor and so will be in the passive voice.
*/
public synchronized boolean gv2IgnoreServerChanges() {
return FeatureFlags.internalUser() && getBoolean(GV2_IGNORE_SERVER_CHANGES, false);
}
/**
* Signed group changes are sent P2P, if the client ignores them, it will then ask the server
* directly which allows testing of certain testing scenarios.
*/
public synchronized boolean gv2IgnoreP2PChanges() {
return FeatureFlags.internalUser() && getBoolean(GV2_IGNORE_P2P_CHANGES, false);
}
}

View File

@@ -1,13 +1,19 @@
package org.thoughtcrime.securesms.preferences;
import android.os.Bundle;
import android.widget.Toast;
import androidx.annotation.NonNull;
import androidx.annotation.Nullable;
import androidx.preference.PreferenceDataStore;
import org.thoughtcrime.securesms.ApplicationPreferencesActivity;
import org.thoughtcrime.securesms.R;
import org.thoughtcrime.securesms.components.SwitchPreferenceCompat;
import org.thoughtcrime.securesms.dependencies.ApplicationDependencies;
import org.thoughtcrime.securesms.jobs.RefreshAttributesJob;
import org.thoughtcrime.securesms.jobs.RefreshOwnProfileJob;
import org.thoughtcrime.securesms.jobs.RotateProfileKeyJob;
import org.thoughtcrime.securesms.keyvalue.InternalValues;
import org.thoughtcrime.securesms.keyvalue.SignalStore;
import org.thoughtcrime.securesms.logging.Log;
@@ -26,14 +32,39 @@ public class InternalOptionsPreferenceFragment extends CorrectedPreferenceFragme
PreferenceDataStore preferenceDataStore = SignalStore.getPreferenceDataStore();
SwitchPreferenceCompat forceGv2Preference = (SwitchPreferenceCompat) this.findPreference(InternalValues.GV2_FORCE_INVITES);
initializeSwitchPreference(preferenceDataStore, InternalValues.GV2_FORCE_INVITES, SignalStore.internalValues().forceGv2Invites());
initializeSwitchPreference(preferenceDataStore, InternalValues.GV2_IGNORE_SERVER_CHANGES, SignalStore.internalValues().gv2IgnoreServerChanges());
initializeSwitchPreference(preferenceDataStore, InternalValues.GV2_IGNORE_P2P_CHANGES, SignalStore.internalValues().gv2IgnoreP2PChanges());
findPreference("pref_refresh_attributes").setOnPreferenceClickListener(preference -> {
ApplicationDependencies.getJobManager()
.startChain(new RefreshAttributesJob())
.then(new RefreshOwnProfileJob())
.enqueue();
Toast.makeText(getContext(), "Scheduled attribute refresh", Toast.LENGTH_SHORT).show();
return true;
});
findPreference("pref_rotate_profile_key").setOnPreferenceClickListener(preference -> {
ApplicationDependencies.getJobManager().add(new RotateProfileKeyJob());
Toast.makeText(getContext(), "Scheduled profile key rotation", Toast.LENGTH_SHORT).show();
return true;
});
}
private void initializeSwitchPreference(@NonNull PreferenceDataStore preferenceDataStore,
@NonNull String key,
boolean checked)
{
SwitchPreferenceCompat forceGv2Preference = (SwitchPreferenceCompat) findPreference(key);
forceGv2Preference.setPreferenceDataStore(preferenceDataStore);
forceGv2Preference.setChecked(SignalStore.internalValues().forceGv2Invites());
forceGv2Preference.setChecked(checked);
}
@Override
public void onResume() {
super.onResume();
//noinspection ConstantConditions
((ApplicationPreferencesActivity) getActivity()).getSupportActionBar().setTitle(R.string.preferences__internal_preferences);
}
}