From a299bafe89ae67084ca93cdd219f22a5db9bb55b Mon Sep 17 00:00:00 2001 From: Greyson Parrelli Date: Wed, 20 May 2020 09:06:08 -0400 Subject: [PATCH] Create a new system for fetching the intial batch of messages. --- app/src/main/AndroidManifest.xml | 2 +- .../securesms/ApplicationContext.java | 34 +++- .../dependencies/ApplicationDependencies.java | 31 ++-- .../ApplicationDependencyProvider.java | 16 +- .../securesms/gcm/FcmFetchService.java | 6 +- .../securesms/gcm/FcmJobService.java | 10 +- .../jobs/PushNotificationReceiveJob.java | 8 +- .../BackgroundMessageRetriever.java} | 92 ++++++----- .../IncomingMessageObserver.java | 21 ++- .../IncomingMessageProcessor.java | 2 +- .../messages/InitialMessageRetriever.java | 151 ++++++++++++++++++ .../messages/MessageRetrievalStrategy.java | 91 +++++++++++ .../{gcm => messages}/RestStrategy.java | 43 +---- .../securesms/messages/WebsocketStrategy.java | 95 +++++++++++ .../notifications/MessageNotifier.java | 3 +- .../securesms/util/ProfileUtil.java | 2 +- .../api/SignalServiceMessagePipe.java | 32 +++- 17 files changed, 522 insertions(+), 117 deletions(-) rename app/src/main/java/org/thoughtcrime/securesms/{gcm/MessageRetriever.java => messages/BackgroundMessageRetriever.java} (52%) rename app/src/main/java/org/thoughtcrime/securesms/{service => messages}/IncomingMessageObserver.java (92%) rename app/src/main/java/org/thoughtcrime/securesms/{ => messages}/IncomingMessageProcessor.java (98%) create mode 100644 app/src/main/java/org/thoughtcrime/securesms/messages/InitialMessageRetriever.java create mode 100644 app/src/main/java/org/thoughtcrime/securesms/messages/MessageRetrievalStrategy.java rename app/src/main/java/org/thoughtcrime/securesms/{gcm => messages}/RestStrategy.java (78%) create mode 100644 app/src/main/java/org/thoughtcrime/securesms/messages/WebsocketStrategy.java diff --git a/app/src/main/AndroidManifest.xml b/app/src/main/AndroidManifest.xml index 1c7ff7384f..b243af7b4c 100644 --- a/app/src/main/AndroidManifest.xml +++ b/app/src/main/AndroidManifest.xml @@ -493,7 +493,7 @@ - + { + long startTime = System.currentTimeMillis(); + + switch (retriever.begin(TimeUnit.SECONDS.toMillis(60))) { + case SUCCESS: + Log.i(TAG, "Successfully caught up on messages. " + (System.currentTimeMillis() - startTime) + " ms"); + break; + case FAILURE_TIMEOUT: + Log.w(TAG, "Did not finish catching up due to a timeout. " + (System.currentTimeMillis() - startTime) + " ms"); + break; + case FAILURE_ERROR: + Log.w(TAG, "Did not finish catching up due to an error. " + (System.currentTimeMillis() - startTime) + " ms"); + break; + case SKIPPED_ALREADY_CAUGHT_UP: + Log.i(TAG, "Already caught up. " + (System.currentTimeMillis() - startTime) + " ms"); + break; + case SKIPPED_ALREADY_RUNNING: + Log.i(TAG, "Already in the process of catching up. " + (System.currentTimeMillis() - startTime) + " ms"); + break; + } + }); + } + @Override protected void attachBaseContext(Context base) { super.attachBaseContext(DynamicLanguageContextWrapper.updateContext(base, TextSecurePreferences.getLanguage(base))); diff --git a/app/src/main/java/org/thoughtcrime/securesms/dependencies/ApplicationDependencies.java b/app/src/main/java/org/thoughtcrime/securesms/dependencies/ApplicationDependencies.java index 29a3dcda1e..d1398d0f75 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/dependencies/ApplicationDependencies.java +++ b/app/src/main/java/org/thoughtcrime/securesms/dependencies/ApplicationDependencies.java @@ -5,17 +5,18 @@ import android.app.Application; import androidx.annotation.NonNull; import org.thoughtcrime.securesms.BuildConfig; -import org.thoughtcrime.securesms.IncomingMessageProcessor; -import org.thoughtcrime.securesms.gcm.MessageRetriever; +import org.thoughtcrime.securesms.messages.IncomingMessageProcessor; +import org.thoughtcrime.securesms.messages.BackgroundMessageRetriever; import org.thoughtcrime.securesms.groups.GroupsV2AuthorizationMemoryValueCache; import org.thoughtcrime.securesms.groups.v2.processing.GroupsV2StateProcessor; import org.thoughtcrime.securesms.jobmanager.JobManager; import org.thoughtcrime.securesms.keyvalue.KeyValueStore; import org.thoughtcrime.securesms.keyvalue.SignalStore; import org.thoughtcrime.securesms.megaphone.MegaphoneRepository; +import org.thoughtcrime.securesms.messages.InitialMessageRetriever; import org.thoughtcrime.securesms.push.SignalServiceNetworkAccess; import org.thoughtcrime.securesms.recipients.LiveRecipientCache; -import org.thoughtcrime.securesms.service.IncomingMessageObserver; +import org.thoughtcrime.securesms.messages.IncomingMessageObserver; import org.thoughtcrime.securesms.util.EarlyMessageCache; import org.thoughtcrime.securesms.util.FeatureFlags; import org.thoughtcrime.securesms.util.FrameRateTracker; @@ -45,7 +46,7 @@ public class ApplicationDependencies { private static SignalServiceMessageSender messageSender; private static SignalServiceMessageReceiver messageReceiver; private static IncomingMessageProcessor incomingMessageProcessor; - private static MessageRetriever messageRetriever; + private static BackgroundMessageRetriever backgroundMessageRetriever; private static LiveRecipientCache recipientCache; private static JobManager jobManager; private static FrameRateTracker frameRateTracker; @@ -55,6 +56,7 @@ public class ApplicationDependencies { private static GroupsV2StateProcessor groupsV2StateProcessor; private static GroupsV2Operations groupsV2Operations; private static EarlyMessageCache earlyMessageCache; + private static InitialMessageRetriever initialMessageRetriever; public static synchronized void init(@NonNull Application application, @NonNull Provider provider) { if (ApplicationDependencies.application != null || ApplicationDependencies.provider != null) { @@ -164,14 +166,14 @@ public class ApplicationDependencies { return incomingMessageProcessor; } - public static synchronized @NonNull MessageRetriever getMessageRetriever() { + public static synchronized @NonNull BackgroundMessageRetriever getBackgroundMessageRetriever() { assertInitialization(); - if (messageRetriever == null) { - messageRetriever = provider.provideMessageRetriever(); + if (backgroundMessageRetriever == null) { + backgroundMessageRetriever = provider.provideBackgroundMessageRetriever(); } - return messageRetriever; + return backgroundMessageRetriever; } public static synchronized @NonNull LiveRecipientCache getRecipientCache() { @@ -234,6 +236,16 @@ public class ApplicationDependencies { return earlyMessageCache; } + public static synchronized @NonNull InitialMessageRetriever getInitialMessageRetriever() { + assertInitialization(); + + if (initialMessageRetriever == null) { + initialMessageRetriever = provider.provideInitialMessageRetriever(); + } + + return initialMessageRetriever; + } + private static void assertInitialization() { if (application == null || provider == null) { throw new UninitializedException(); @@ -247,13 +259,14 @@ public class ApplicationDependencies { @NonNull SignalServiceMessageReceiver provideSignalServiceMessageReceiver(); @NonNull SignalServiceNetworkAccess provideSignalServiceNetworkAccess(); @NonNull IncomingMessageProcessor provideIncomingMessageProcessor(); - @NonNull MessageRetriever provideMessageRetriever(); + @NonNull BackgroundMessageRetriever provideBackgroundMessageRetriever(); @NonNull LiveRecipientCache provideRecipientCache(); @NonNull JobManager provideJobManager(); @NonNull FrameRateTracker provideFrameRateTracker(); @NonNull KeyValueStore provideKeyValueStore(); @NonNull MegaphoneRepository provideMegaphoneRepository(); @NonNull EarlyMessageCache provideEarlyMessageCache(); + @NonNull InitialMessageRetriever provideInitialMessageRetriever(); } private static class UninitializedException extends IllegalStateException { diff --git a/app/src/main/java/org/thoughtcrime/securesms/dependencies/ApplicationDependencyProvider.java b/app/src/main/java/org/thoughtcrime/securesms/dependencies/ApplicationDependencyProvider.java index 20210a72be..7237dcfab0 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/dependencies/ApplicationDependencyProvider.java +++ b/app/src/main/java/org/thoughtcrime/securesms/dependencies/ApplicationDependencyProvider.java @@ -7,11 +7,11 @@ import androidx.annotation.NonNull; import org.greenrobot.eventbus.EventBus; import org.thoughtcrime.securesms.BuildConfig; -import org.thoughtcrime.securesms.IncomingMessageProcessor; +import org.thoughtcrime.securesms.messages.IncomingMessageProcessor; import org.thoughtcrime.securesms.crypto.storage.SignalProtocolStoreImpl; import org.thoughtcrime.securesms.database.DatabaseFactory; import org.thoughtcrime.securesms.events.ReminderUpdateEvent; -import org.thoughtcrime.securesms.gcm.MessageRetriever; +import org.thoughtcrime.securesms.messages.BackgroundMessageRetriever; import org.thoughtcrime.securesms.jobmanager.JobManager; import org.thoughtcrime.securesms.jobmanager.JobMigrator; import org.thoughtcrime.securesms.jobmanager.impl.JsonDataSerializer; @@ -20,10 +20,11 @@ import org.thoughtcrime.securesms.jobs.JobManagerFactories; import org.thoughtcrime.securesms.keyvalue.KeyValueStore; import org.thoughtcrime.securesms.logging.Log; import org.thoughtcrime.securesms.megaphone.MegaphoneRepository; +import org.thoughtcrime.securesms.messages.InitialMessageRetriever; import org.thoughtcrime.securesms.push.SecurityEventListener; import org.thoughtcrime.securesms.push.SignalServiceNetworkAccess; import org.thoughtcrime.securesms.recipients.LiveRecipientCache; -import org.thoughtcrime.securesms.service.IncomingMessageObserver; +import org.thoughtcrime.securesms.messages.IncomingMessageObserver; import org.thoughtcrime.securesms.util.AlarmSleepTimer; import org.thoughtcrime.securesms.util.EarlyMessageCache; import org.thoughtcrime.securesms.util.FeatureFlags; @@ -111,8 +112,8 @@ public class ApplicationDependencyProvider implements ApplicationDependencies.Pr } @Override - public @NonNull MessageRetriever provideMessageRetriever() { - return new MessageRetriever(); + public @NonNull BackgroundMessageRetriever provideBackgroundMessageRetriever() { + return new BackgroundMessageRetriever(); } @Override @@ -152,6 +153,11 @@ public class ApplicationDependencyProvider implements ApplicationDependencies.Pr return new EarlyMessageCache(); } + @Override + public @NonNull InitialMessageRetriever provideInitialMessageRetriever() { + return new InitialMessageRetriever(); + } + private static class DynamicCredentialsProvider implements CredentialsProvider { private final Context context; diff --git a/app/src/main/java/org/thoughtcrime/securesms/gcm/FcmFetchService.java b/app/src/main/java/org/thoughtcrime/securesms/gcm/FcmFetchService.java index e7a2abd3c8..d62e7c363d 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/gcm/FcmFetchService.java +++ b/app/src/main/java/org/thoughtcrime/securesms/gcm/FcmFetchService.java @@ -12,6 +12,8 @@ import com.google.firebase.messaging.RemoteMessage; import org.thoughtcrime.securesms.dependencies.ApplicationDependencies; import org.thoughtcrime.securesms.jobs.PushNotificationReceiveJob; import org.thoughtcrime.securesms.logging.Log; +import org.thoughtcrime.securesms.messages.BackgroundMessageRetriever; +import org.thoughtcrime.securesms.messages.RestStrategy; import org.thoughtcrime.securesms.util.concurrent.SerialMonoLifoExecutor; import org.thoughtcrime.securesms.util.concurrent.SignalExecutors; @@ -68,8 +70,8 @@ public class FcmFetchService extends Service { } private void fetch() { - MessageRetriever retriever = ApplicationDependencies.getMessageRetriever(); - boolean success = retriever.retrieveMessages(this, new RestStrategy(), new RestStrategy()); + BackgroundMessageRetriever retriever = ApplicationDependencies.getBackgroundMessageRetriever(); + boolean success = retriever.retrieveMessages(this, new RestStrategy(), new RestStrategy()); if (success) { Log.i(TAG, "Successfully retrieved messages."); diff --git a/app/src/main/java/org/thoughtcrime/securesms/gcm/FcmJobService.java b/app/src/main/java/org/thoughtcrime/securesms/gcm/FcmJobService.java index 1049681f4d..61480432f3 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/gcm/FcmJobService.java +++ b/app/src/main/java/org/thoughtcrime/securesms/gcm/FcmJobService.java @@ -11,6 +11,8 @@ import androidx.annotation.RequiresApi; import org.thoughtcrime.securesms.dependencies.ApplicationDependencies; import org.thoughtcrime.securesms.logging.Log; +import org.thoughtcrime.securesms.messages.BackgroundMessageRetriever; +import org.thoughtcrime.securesms.messages.RestStrategy; import org.thoughtcrime.securesms.util.ServiceUtil; import org.thoughtcrime.securesms.util.TextSecurePreferences; import org.thoughtcrime.securesms.util.concurrent.SignalExecutors; @@ -39,15 +41,15 @@ public class FcmJobService extends JobService { public boolean onStartJob(JobParameters params) { Log.d(TAG, "onStartJob()"); - if (MessageRetriever.shouldIgnoreFetch(this)) { + if (BackgroundMessageRetriever.shouldIgnoreFetch(this)) { Log.i(TAG, "App is foregrounded. No need to run."); return false; } SignalExecutors.UNBOUNDED.execute(() -> { - Context context = getApplicationContext(); - MessageRetriever retriever = ApplicationDependencies.getMessageRetriever(); - boolean success = retriever.retrieveMessages(context, new RestStrategy(), new RestStrategy()); + Context context = getApplicationContext(); + BackgroundMessageRetriever retriever = ApplicationDependencies.getBackgroundMessageRetriever(); + boolean success = retriever.retrieveMessages(context, new RestStrategy(), new RestStrategy()); if (success) { Log.i(TAG, "Successfully retrieved messages."); diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/PushNotificationReceiveJob.java b/app/src/main/java/org/thoughtcrime/securesms/jobs/PushNotificationReceiveJob.java index c7add3571b..dfa683e539 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/PushNotificationReceiveJob.java +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/PushNotificationReceiveJob.java @@ -4,8 +4,8 @@ import android.content.Context; import androidx.annotation.NonNull; import org.thoughtcrime.securesms.dependencies.ApplicationDependencies; -import org.thoughtcrime.securesms.gcm.MessageRetriever; -import org.thoughtcrime.securesms.gcm.RestStrategy; +import org.thoughtcrime.securesms.messages.BackgroundMessageRetriever; +import org.thoughtcrime.securesms.messages.RestStrategy; import org.thoughtcrime.securesms.jobmanager.Data; import org.thoughtcrime.securesms.jobmanager.Job; import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint; @@ -46,8 +46,8 @@ public class PushNotificationReceiveJob extends BaseJob { @Override public void onRun() throws IOException { - MessageRetriever retriever = ApplicationDependencies.getMessageRetriever(); - boolean result = retriever.retrieveMessages(context, new RestStrategy()); + BackgroundMessageRetriever retriever = ApplicationDependencies.getBackgroundMessageRetriever(); + boolean result = retriever.retrieveMessages(context, new RestStrategy()); if (result) { Log.i(TAG, "Successfully pulled messages."); diff --git a/app/src/main/java/org/thoughtcrime/securesms/gcm/MessageRetriever.java b/app/src/main/java/org/thoughtcrime/securesms/messages/BackgroundMessageRetriever.java similarity index 52% rename from app/src/main/java/org/thoughtcrime/securesms/gcm/MessageRetriever.java rename to app/src/main/java/org/thoughtcrime/securesms/messages/BackgroundMessageRetriever.java index 4cb5d23af5..08a2fa8372 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/gcm/MessageRetriever.java +++ b/app/src/main/java/org/thoughtcrime/securesms/messages/BackgroundMessageRetriever.java @@ -1,4 +1,4 @@ -package org.thoughtcrime.securesms.gcm; +package org.thoughtcrime.securesms.messages; import android.content.Context; import android.os.PowerManager; @@ -19,21 +19,24 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; /** - * Facilitates the retrieval of messages via provided {@link Strategy}'s. + * Retrieves messages while the app is in the background via provided {@link MessageRetrievalStrategy}'s. */ -public class MessageRetriever { +public class BackgroundMessageRetriever { - private static final String TAG = Log.tag(MessageRetriever.class); + private static final String TAG = Log.tag(BackgroundMessageRetriever.class); private static final String WAKE_LOCK_TAG = "MessageRetriever"; private static final Semaphore ACTIVE_LOCK = new Semaphore(2); + private static final long CATCHUP_TIMEOUT = TimeUnit.SECONDS.toMillis(60); + private static final long NORMAL_TIMEOUT = TimeUnit.SECONDS.toMillis(10); + /** * @return False if the retrieval failed and should be rescheduled, otherwise true. */ @WorkerThread - public boolean retrieveMessages(@NonNull Context context, Strategy... strategies) { + public boolean retrieveMessages(@NonNull Context context, MessageRetrievalStrategy... strategies) { if (shouldIgnoreFetch(context)) { Log.i(TAG, "Skipping retrieval -- app is in the foreground."); return true; @@ -61,33 +64,21 @@ public class MessageRetriever { Log.w(TAG, "We may be operating in a constrained environment. Doze: " + doze + " Network: " + network); } - boolean success = false; - - for (Strategy strategy : strategies) { - if (shouldIgnoreFetch(context)) { - Log.i(TAG, "Stopping further strategy attempts -- app is in the foreground." + logSuffix(startTime)); - success = true; - break; - } - - Log.i(TAG, "Attempting strategy: " + strategy.toString() + logSuffix(startTime)); - - if (strategy.run()) { - Log.i(TAG, "Strategy succeeded: " + strategy.toString() + logSuffix(startTime)); - success = true; - break; - } else { - Log.w(TAG, "Strategy failed: " + strategy.toString() + logSuffix(startTime)); - } - } - - if (success) { - TextSecurePreferences.setNeedsMessagePull(context, false); + if (ApplicationDependencies.getInitialMessageRetriever().isCaughtUp()) { + Log.i(TAG, "Performing normal message fetch."); + return executeBackgroundRetrieval(context, startTime, strategies); } else { - Log.w(TAG, "All strategies failed!" + logSuffix(startTime)); + Log.i(TAG, "Performing initial message fetch."); + InitialMessageRetriever.Result result = ApplicationDependencies.getInitialMessageRetriever().begin(CATCHUP_TIMEOUT); + if (result == InitialMessageRetriever.Result.SUCCESS) { + Log.i(TAG, "Initial message request was completed successfully. " + logSuffix(startTime)); + TextSecurePreferences.setNeedsMessagePull(context, false); + return true; + } else { + Log.w(TAG, "Initial message fetch returned result " + result + ", so doing a normal message fetch."); + return executeBackgroundRetrieval(context, System.currentTimeMillis(), strategies); + } } - - return success; } finally { WakeLockUtil.release(wakeLock, WAKE_LOCK_TAG); ACTIVE_LOCK.release(); @@ -95,6 +86,36 @@ public class MessageRetriever { } } + private boolean executeBackgroundRetrieval(@NonNull Context context, long startTime, @NonNull MessageRetrievalStrategy[] strategies) { + boolean success = false; + + for (MessageRetrievalStrategy strategy : strategies) { + if (shouldIgnoreFetch(context)) { + Log.i(TAG, "Stopping further strategy attempts -- app is in the foreground." + logSuffix(startTime)); + success = true; + break; + } + + Log.i(TAG, "Attempting strategy: " + strategy.toString() + logSuffix(startTime)); + + if (strategy.execute(NORMAL_TIMEOUT)) { + Log.i(TAG, "Strategy succeeded: " + strategy.toString() + logSuffix(startTime)); + success = true; + break; + } else { + Log.w(TAG, "Strategy failed: " + strategy.toString() + logSuffix(startTime)); + } + } + + if (success) { + TextSecurePreferences.setNeedsMessagePull(context, false); + } else { + Log.w(TAG, "All strategies failed!" + logSuffix(startTime)); + } + + return success; + } + /** * @return True if there is no need to execute a message fetch, because the websocket will take * care of it. @@ -107,15 +128,4 @@ public class MessageRetriever { private static String logSuffix(long startTime) { return " (" + (System.currentTimeMillis() - startTime) + " ms elapsed)"; } - - /** - * A method of retrieving messages. - */ - public interface Strategy { - /** - * @return False if the message retrieval failed and should be retried, otherwise true. - */ - @WorkerThread - boolean run(); - } } diff --git a/app/src/main/java/org/thoughtcrime/securesms/service/IncomingMessageObserver.java b/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.java similarity index 92% rename from app/src/main/java/org/thoughtcrime/securesms/service/IncomingMessageObserver.java rename to app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.java index d8867e33ad..b9cf57a229 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/service/IncomingMessageObserver.java +++ b/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.java @@ -1,4 +1,4 @@ -package org.thoughtcrime.securesms.service; +package org.thoughtcrime.securesms.messages; import android.app.Service; import androidx.lifecycle.DefaultLifecycleObserver; @@ -12,7 +12,7 @@ import androidx.annotation.Nullable; import androidx.core.app.NotificationCompat; import androidx.core.content.ContextCompat; -import org.thoughtcrime.securesms.IncomingMessageProcessor.Processor; +import org.thoughtcrime.securesms.messages.IncomingMessageProcessor.Processor; import org.thoughtcrime.securesms.dependencies.ApplicationDependencies; import org.thoughtcrime.securesms.jobmanager.ConstraintObserver; import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint; @@ -71,6 +71,8 @@ public class IncomingMessageObserver implements ConstraintObserver.Notifier { onAppBackgrounded(); } }); + + ApplicationDependencies.getInitialMessageRetriever().addListener(this::onInitialRetrievalComplete); } @Override @@ -90,16 +92,21 @@ public class IncomingMessageObserver implements ConstraintObserver.Notifier { notifyAll(); } + private synchronized void onInitialRetrievalComplete() { + notifyAll(); + } + private synchronized boolean isConnectionNecessary() { boolean isGcmDisabled = TextSecurePreferences.isFcmDisabled(context); Log.d(TAG, String.format("Network requirement: %s, app visible: %s, gcm disabled: %b", networkConstraint.isMet(), appVisible, isGcmDisabled)); - return TextSecurePreferences.isPushRegistered(context) && - TextSecurePreferences.isWebsocketRegistered(context) && - (appVisible || isGcmDisabled) && - networkConstraint.isMet() && + return TextSecurePreferences.isPushRegistered(context) && + TextSecurePreferences.isWebsocketRegistered(context) && + (appVisible || isGcmDisabled) && + networkConstraint.isMet() && + ApplicationDependencies.getInitialMessageRetriever().isCaughtUp() && !networkAccess.isCensored(context); } @@ -156,7 +163,7 @@ public class IncomingMessageObserver implements ConstraintObserver.Notifier { Log.i(TAG, "Reading message..."); localPipe.read(REQUEST_TIMEOUT_MINUTES, TimeUnit.MINUTES, envelope -> { - Log.i(TAG, "Retrieved envelope! " + envelope.getSourceIdentifier()); + Log.i(TAG, "Retrieved envelope! " + envelope.getTimestamp()); try (Processor processor = ApplicationDependencies.getIncomingMessageProcessor().acquire()) { processor.processEnvelope(envelope); } diff --git a/app/src/main/java/org/thoughtcrime/securesms/IncomingMessageProcessor.java b/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageProcessor.java similarity index 98% rename from app/src/main/java/org/thoughtcrime/securesms/IncomingMessageProcessor.java rename to app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageProcessor.java index 4668aee0f3..9f6af2313e 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/IncomingMessageProcessor.java +++ b/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageProcessor.java @@ -1,4 +1,4 @@ -package org.thoughtcrime.securesms; +package org.thoughtcrime.securesms.messages; import android.content.Context; diff --git a/app/src/main/java/org/thoughtcrime/securesms/messages/InitialMessageRetriever.java b/app/src/main/java/org/thoughtcrime/securesms/messages/InitialMessageRetriever.java new file mode 100644 index 0000000000..b14c7d8a4a --- /dev/null +++ b/app/src/main/java/org/thoughtcrime/securesms/messages/InitialMessageRetriever.java @@ -0,0 +1,151 @@ +package org.thoughtcrime.securesms.messages; + +import android.content.Context; + +import androidx.annotation.AnyThread; +import androidx.annotation.NonNull; +import androidx.annotation.WorkerThread; + +import org.thoughtcrime.securesms.ApplicationContext; +import org.thoughtcrime.securesms.dependencies.ApplicationDependencies; +import org.thoughtcrime.securesms.logging.Log; +import org.thoughtcrime.securesms.util.concurrent.SignalExecutors; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +/** + * Fetches the first batch of messages, before anything else does. + * + * We have a separate process for fetching "initial" messages in order to have special behavior when + * catching up on a lot of messages after being offline for a while. It also gives us an opportunity + * to flag when we are "up-to-date" with our message queue. + */ +public class InitialMessageRetriever { + + private static final String TAG = Log.tag(InitialMessageRetriever.class); + + private static final int MAX_ATTEMPTS = 3; + + private final List listeners = new CopyOnWriteArrayList<>(); + + private State state = State.NOT_CAUGHT_UP; + + private final Object STATE_LOCK = new Object(); + + /** + * Only fires once. No need to remove. It will be called on an arbitrary worker thread. + */ + public void addListener(@NonNull Listener listener) { + synchronized (STATE_LOCK) { + if (state == State.CAUGHT_UP) { + listener.onCaughtUp(); + } else { + listeners.add(listener); + } + } + } + + /** + * Performs the initial fetch for messages (if necessary) with the requested timeout. The timeout + * is not just for the initial network request, but for the entire method call. + * + * @return A result describing how the operation completed. + */ + @WorkerThread + public @NonNull Result begin(long timeout) { + synchronized (STATE_LOCK) { + if (state == State.CAUGHT_UP) { + return Result.SKIPPED_ALREADY_CAUGHT_UP; + } else if (state == State.RUNNING) { + return Result.SKIPPED_ALREADY_RUNNING; + } + + state = State.RUNNING; + } + + long startTime = System.currentTimeMillis(); + + MessageRetrievalStrategy messageRetrievalStrategy = getRetriever(); + CountDownLatch latch = new CountDownLatch(1); + + SignalExecutors.UNBOUNDED.execute(() -> { + for (int i = 0; i < MAX_ATTEMPTS; i++) { + if (messageRetrievalStrategy.isCanceled()) { + Log.w(TAG, "Invalidated! Ending attempts."); + break; + } + + boolean success = getRetriever().execute(timeout); + + if (success) { + break; + } else { + Log.w(TAG, "Failed to catch up! Attempt " + (i + 1) + "/" + MAX_ATTEMPTS); + } + } + + latch.countDown(); + }); + + try { + boolean success = latch.await(timeout, TimeUnit.MILLISECONDS); + + synchronized (STATE_LOCK) { + state = State.CAUGHT_UP; + + for (Listener listener : listeners) { + listener.onCaughtUp(); + } + + listeners.clear(); + } + + if (success) { + Log.i(TAG, "Successfully caught up in " + (System.currentTimeMillis() - startTime) + " ms."); + return Result.SUCCESS; + } else { + Log.i(TAG, "Could not catch up completely. Hit the timeout of " + timeout + " ms."); + messageRetrievalStrategy.cancel(); + return Result.FAILURE_TIMEOUT; + } + } catch (InterruptedException e) { + Log.w(TAG, "Interrupted!", e); + return Result.FAILURE_ERROR; + } + } + + public boolean isCaughtUp() { + synchronized (STATE_LOCK) { + return state == State.CAUGHT_UP; + } + } + + private @NonNull MessageRetrievalStrategy getRetriever() { + Context context = ApplicationDependencies.getApplication(); + + if (ApplicationContext.getInstance(context).isAppVisible() && + !ApplicationDependencies.getSignalServiceNetworkAccess().isCensored(context)) + { + return new WebsocketStrategy(); + } else { + return new RestStrategy(); + } + } + + private enum State { + NOT_CAUGHT_UP, RUNNING, CAUGHT_UP + } + + public enum Result { + SUCCESS, FAILURE_TIMEOUT, FAILURE_ERROR, SKIPPED_ALREADY_CAUGHT_UP, SKIPPED_ALREADY_RUNNING + } + + public interface Listener { + @WorkerThread + void onCaughtUp(); + } +} diff --git a/app/src/main/java/org/thoughtcrime/securesms/messages/MessageRetrievalStrategy.java b/app/src/main/java/org/thoughtcrime/securesms/messages/MessageRetrievalStrategy.java new file mode 100644 index 0000000000..44123e8ec5 --- /dev/null +++ b/app/src/main/java/org/thoughtcrime/securesms/messages/MessageRetrievalStrategy.java @@ -0,0 +1,91 @@ +package org.thoughtcrime.securesms.messages; + +import androidx.annotation.AnyThread; +import androidx.annotation.NonNull; +import androidx.annotation.WorkerThread; + +import org.thoughtcrime.securesms.dependencies.ApplicationDependencies; +import org.thoughtcrime.securesms.jobmanager.Job; +import org.thoughtcrime.securesms.jobmanager.JobManager; +import org.thoughtcrime.securesms.jobmanager.JobTracker; +import org.thoughtcrime.securesms.jobs.MarkerJob; +import org.thoughtcrime.securesms.logging.Log; +import org.whispersystems.libsignal.util.guava.Optional; +import org.whispersystems.signalservice.api.SignalServiceMessageReceiver; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Implementations are responsible for fetching and processing a batch of messages. + */ +public abstract class MessageRetrievalStrategy { + + private volatile boolean canceled; + + /** + * Fetches and processes any pending messages. This method should block until the messages are + * actually stored and processed -- not just retrieved. + * + * @param timeout Hint for how long this will run. The strategy will also be canceled after the + * timeout ends, but having the timeout available may be useful for setting things + * like socket timeouts. + * + * @return True if everything was successful up until cancelation, false otherwise. + */ + @WorkerThread + abstract boolean execute(long timeout); + + /** + * Marks the strategy as canceled. It is the responsibility of the implementation of + * {@link #execute(long)} to check {@link #isCanceled()} to know if execution should stop. + */ + void cancel() { + this.canceled = true; + } + + protected boolean isCanceled() { + return canceled; + } + + protected static void blockUntilQueueDrained(@NonNull String tag, @NonNull String queue, long timeoutMs) { + long startTime = System.currentTimeMillis(); + final JobManager jobManager = ApplicationDependencies.getJobManager(); + final MarkerJob markerJob = new MarkerJob(queue); + + Optional jobState = jobManager.runSynchronously(markerJob, timeoutMs); + + if (!jobState.isPresent()) { + Log.w(tag, "Timed out waiting for " + queue + " job(s) to finish!"); + } + + long endTime = System.currentTimeMillis(); + long duration = endTime - startTime; + + Log.d(tag, "Waited " + duration + " ms for the " + queue + " job(s) to finish."); + } + + protected static String timeSuffix(long startTime) { + return " (" + (System.currentTimeMillis() - startTime) + " ms elapsed)"; + } + + protected static class QueueFindingJobListener implements JobTracker.JobListener { + private final Set queues = new HashSet<>(); + + @Override + @AnyThread + public void onStateChanged(@NonNull Job job, @NonNull JobTracker.JobState jobState) { + synchronized (queues) { + queues.add(job.getParameters().getQueue()); + } + } + + @NonNull Set getQueues() { + synchronized (queues) { + return new HashSet<>(queues); + } + } + } +} diff --git a/app/src/main/java/org/thoughtcrime/securesms/gcm/RestStrategy.java b/app/src/main/java/org/thoughtcrime/securesms/messages/RestStrategy.java similarity index 78% rename from app/src/main/java/org/thoughtcrime/securesms/gcm/RestStrategy.java rename to app/src/main/java/org/thoughtcrime/securesms/messages/RestStrategy.java index 7870025e7f..38dd9749a2 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/gcm/RestStrategy.java +++ b/app/src/main/java/org/thoughtcrime/securesms/messages/RestStrategy.java @@ -1,12 +1,9 @@ -package org.thoughtcrime.securesms.gcm; +package org.thoughtcrime.securesms.messages; -import androidx.annotation.AnyThread; import androidx.annotation.NonNull; import androidx.annotation.WorkerThread; -import org.thoughtcrime.securesms.IncomingMessageProcessor; import org.thoughtcrime.securesms.dependencies.ApplicationDependencies; -import org.thoughtcrime.securesms.jobmanager.Job; import org.thoughtcrime.securesms.jobmanager.JobManager; import org.thoughtcrime.securesms.jobmanager.JobTracker; import org.thoughtcrime.securesms.jobs.MarkerJob; @@ -17,32 +14,27 @@ import org.whispersystems.libsignal.util.guava.Optional; import org.whispersystems.signalservice.api.SignalServiceMessageReceiver; import java.io.IOException; -import java.util.Collections; -import java.util.HashSet; import java.util.Iterator; import java.util.Set; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** * Retrieves messages over the REST endpoint. */ -public class RestStrategy implements MessageRetriever.Strategy { +public class RestStrategy extends MessageRetrievalStrategy { private static final String TAG = Log.tag(RestStrategy.class); - private static final long SOCKET_TIMEOUT = TimeUnit.SECONDS.toMillis(10); - @WorkerThread @Override - public boolean run() { + public boolean execute(long timeout) { long startTime = System.currentTimeMillis(); JobManager jobManager = ApplicationDependencies.getJobManager(); QueueFindingJobListener queueListener = new QueueFindingJobListener(); try (IncomingMessageProcessor.Processor processor = ApplicationDependencies.getIncomingMessageProcessor().acquire()) { - int jobCount = enqueuePushDecryptJobs(processor, startTime); + int jobCount = enqueuePushDecryptJobs(processor, startTime, timeout); if (jobCount == 0) { Log.d(TAG, "No PushDecryptMessageJobs were enqueued."); @@ -82,13 +74,13 @@ public class RestStrategy implements MessageRetriever.Strategy { } } - private static int enqueuePushDecryptJobs(IncomingMessageProcessor.Processor processor, long startTime) + private static int enqueuePushDecryptJobs(IncomingMessageProcessor.Processor processor, long startTime, long timeout) throws IOException { SignalServiceMessageReceiver receiver = ApplicationDependencies.getSignalServiceMessageReceiver(); AtomicInteger jobCount = new AtomicInteger(0); - receiver.setSoTimeoutMillis(SOCKET_TIMEOUT); + receiver.setSoTimeoutMillis(timeout); receiver.retrieveMessages(envelope -> { Log.i(TAG, "Retrieved an envelope." + timeSuffix(startTime)); @@ -103,7 +95,6 @@ public class RestStrategy implements MessageRetriever.Strategy { return jobCount.get(); } - private static long blockUntilQueueDrained(@NonNull String queue, long timeoutMs) { long startTime = System.currentTimeMillis(); final JobManager jobManager = ApplicationDependencies.getJobManager(); @@ -122,30 +113,8 @@ public class RestStrategy implements MessageRetriever.Strategy { return timeoutMs - duration; } - private static String timeSuffix(long startTime) { - return " (" + (System.currentTimeMillis() - startTime) + " ms elapsed)"; - } - @Override public @NonNull String toString() { return RestStrategy.class.getSimpleName(); } - - private static class QueueFindingJobListener implements JobTracker.JobListener { - private final Set queues = new HashSet<>(); - - @Override - @AnyThread - public void onStateChanged(@NonNull Job job, @NonNull JobTracker.JobState jobState) { - synchronized (queues) { - queues.add(job.getParameters().getQueue()); - } - } - - @NonNull Set getQueues() { - synchronized (queues) { - return new HashSet<>(queues); - } - } - } } diff --git a/app/src/main/java/org/thoughtcrime/securesms/messages/WebsocketStrategy.java b/app/src/main/java/org/thoughtcrime/securesms/messages/WebsocketStrategy.java new file mode 100644 index 0000000000..1f7bc0a54f --- /dev/null +++ b/app/src/main/java/org/thoughtcrime/securesms/messages/WebsocketStrategy.java @@ -0,0 +1,95 @@ +package org.thoughtcrime.securesms.messages; + +import androidx.annotation.NonNull; + +import org.thoughtcrime.securesms.dependencies.ApplicationDependencies; +import org.thoughtcrime.securesms.jobmanager.JobManager; +import org.thoughtcrime.securesms.jobs.PushProcessMessageJob; +import org.thoughtcrime.securesms.logging.Log; +import org.whispersystems.libsignal.InvalidVersionException; +import org.whispersystems.libsignal.util.guava.Optional; +import org.whispersystems.signalservice.api.SignalServiceMessagePipe; +import org.whispersystems.signalservice.api.SignalServiceMessageReceiver; +import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +class WebsocketStrategy extends MessageRetrievalStrategy { + + private static final String TAG = Log.tag(WebsocketStrategy.class); + + private final SignalServiceMessageReceiver receiver; + private final JobManager jobManager; + + public WebsocketStrategy() { + this.receiver = ApplicationDependencies.getSignalServiceMessageReceiver(); + this.jobManager = ApplicationDependencies.getJobManager(); + } + + @Override + public boolean execute(long timeout) { + long startTime = System.currentTimeMillis(); + + try { + Set processJobQueues = drainWebsocket(timeout, startTime); + Iterator queueIterator = processJobQueues.iterator(); + long timeRemaining = Math.max(0, timeout - (System.currentTimeMillis() - startTime)); + + while (!isCanceled() && queueIterator.hasNext() && timeRemaining > 0) { + String queue = queueIterator.next(); + + blockUntilQueueDrained(TAG, queue, timeRemaining); + + timeRemaining = Math.max(0, timeout - (System.currentTimeMillis() - startTime)); + } + + return true; + } catch (IOException e) { + Log.w(TAG, "Encountered an exception while draining the websocket.", e); + return false; + } + } + + private @NonNull Set drainWebsocket(long timeout, long startTime) throws IOException { + SignalServiceMessagePipe pipe = receiver.createMessagePipe(); + QueueFindingJobListener queueListener = new QueueFindingJobListener(); + + jobManager.addListener(job -> job.getParameters().getQueue() != null && job.getParameters().getQueue().startsWith(PushProcessMessageJob.QUEUE_PREFIX), queueListener); + + try { + while (shouldContinue()) { + try { + Optional result = pipe.readOrEmpty(timeout, TimeUnit.MILLISECONDS, envelope -> { + Log.i(TAG, "Retrieved envelope! " + envelope.getTimestamp() + timeSuffix(startTime)); + try (IncomingMessageProcessor.Processor processor = ApplicationDependencies.getIncomingMessageProcessor().acquire()) { + processor.processEnvelope(envelope); + } + }); + + if (!result.isPresent()) { + Log.i(TAG, "Hit an empty response. Finished." + timeSuffix(startTime)); + break; + } + } catch (TimeoutException e) { + Log.w(TAG, "Websocket timeout." + timeSuffix(startTime)); + } catch (InvalidVersionException e) { + Log.w(TAG, e); + } + } + } finally { + pipe.shutdown(); + jobManager.removeListener(queueListener); + } + + return queueListener.getQueues(); + } + + + private boolean shouldContinue() { + return !isCanceled(); + } +} diff --git a/app/src/main/java/org/thoughtcrime/securesms/notifications/MessageNotifier.java b/app/src/main/java/org/thoughtcrime/securesms/notifications/MessageNotifier.java index d8fed8cc9e..93f1bfc4ef 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/notifications/MessageNotifier.java +++ b/app/src/main/java/org/thoughtcrime/securesms/notifications/MessageNotifier.java @@ -55,12 +55,13 @@ import org.thoughtcrime.securesms.database.model.MediaMmsMessageRecord; import org.thoughtcrime.securesms.database.model.MessageRecord; import org.thoughtcrime.securesms.database.model.MmsMessageRecord; import org.thoughtcrime.securesms.database.model.ReactionRecord; +import org.thoughtcrime.securesms.dependencies.ApplicationDependencies; import org.thoughtcrime.securesms.logging.Log; import org.thoughtcrime.securesms.mms.Slide; import org.thoughtcrime.securesms.mms.SlideDeck; import org.thoughtcrime.securesms.recipients.Recipient; import org.thoughtcrime.securesms.recipients.RecipientUtil; -import org.thoughtcrime.securesms.service.IncomingMessageObserver; +import org.thoughtcrime.securesms.messages.IncomingMessageObserver; import org.thoughtcrime.securesms.service.KeyCachingService; import org.thoughtcrime.securesms.util.MediaUtil; import org.thoughtcrime.securesms.util.MessageRecordUtil; diff --git a/app/src/main/java/org/thoughtcrime/securesms/util/ProfileUtil.java b/app/src/main/java/org/thoughtcrime/securesms/util/ProfileUtil.java index 800a3f0c2a..6dddbe2340 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/util/ProfileUtil.java +++ b/app/src/main/java/org/thoughtcrime/securesms/util/ProfileUtil.java @@ -14,7 +14,7 @@ import org.thoughtcrime.securesms.dependencies.ApplicationDependencies; import org.thoughtcrime.securesms.logging.Log; import org.thoughtcrime.securesms.recipients.Recipient; import org.thoughtcrime.securesms.recipients.RecipientUtil; -import org.thoughtcrime.securesms.service.IncomingMessageObserver; +import org.thoughtcrime.securesms.messages.IncomingMessageObserver; import org.whispersystems.libsignal.util.guava.Optional; import org.whispersystems.signalservice.api.SignalServiceMessagePipe; import org.whispersystems.signalservice.api.SignalServiceMessageReceiver; diff --git a/libsignal/service/src/main/java/org/whispersystems/signalservice/api/SignalServiceMessagePipe.java b/libsignal/service/src/main/java/org/whispersystems/signalservice/api/SignalServiceMessagePipe.java index bfa3079e24..09cdac1985 100644 --- a/libsignal/service/src/main/java/org/whispersystems/signalservice/api/SignalServiceMessagePipe.java +++ b/libsignal/service/src/main/java/org/whispersystems/signalservice/api/SignalServiceMessagePipe.java @@ -108,6 +108,26 @@ public class SignalServiceMessagePipe { */ public SignalServiceEnvelope read(long timeout, TimeUnit unit, MessagePipeCallback callback) throws TimeoutException, IOException, InvalidVersionException + { + while (true) { + Optional envelope = readOrEmpty(timeout, unit, callback); + + if (envelope.isPresent()) { + return envelope.get(); + } + } + } + + /** + * Similar to {@link #read(long, TimeUnit, MessagePipeCallback)}, except this will return + * {@link Optional#absent()} when an empty response is hit, which indicates the websocket is + * empty. + * + * Important: The empty response will only be hit once for each instance of {@link SignalServiceMessagePipe}. + * That means subsequent calls will block until an envelope is available. + */ + public Optional readOrEmpty(long timeout, TimeUnit unit, MessagePipeCallback callback) + throws TimeoutException, IOException, InvalidVersionException { if (!credentialsProvider.isPresent()) { throw new IllegalArgumentException("You can't read messages if you haven't specified credentials"); @@ -125,7 +145,9 @@ public class SignalServiceMessagePipe { signalKeyEncrypted); callback.onMessage(envelope); - return envelope; + return Optional.of(envelope); + } else if (isSocketEmptyRequest(request)) { + return Optional.absent(); } } finally { websocket.sendResponse(response); @@ -275,6 +297,10 @@ public class SignalServiceMessagePipe { return "PUT".equals(message.getVerb()) && "/api/v1/message".equals(message.getPath()); } + private boolean isSocketEmptyRequest(WebSocketRequestMessage message) { + return "PUT".equals(message.getVerb()) && "/api/v1/queue/empty".equals(message.getPath()); + } + private boolean isSignalKeyEncrypted(WebSocketRequestMessage message) { List headers = message.getHeadersList(); @@ -315,8 +341,8 @@ public class SignalServiceMessagePipe { * For receiving a callback when a new message has been * received. */ - public static interface MessagePipeCallback { - public void onMessage(SignalServiceEnvelope envelope); + public interface MessagePipeCallback { + void onMessage(SignalServiceEnvelope envelope); } private static class NullMessagePipeCallback implements MessagePipeCallback {