From 662f0b8fb60e23999b580618584e0c0b9c6bce94 Mon Sep 17 00:00:00 2001 From: Greyson Parrelli Date: Tue, 21 Jul 2020 10:38:42 -0400 Subject: [PATCH] Improve detection of websocket drained status. Will now work when you lose and regain network. Also removes the unnecessary InitialMessageRetriever. --- .../securesms/ApplicationContext.java | 36 +---- .../dependencies/ApplicationDependencies.java | 26 ++- .../ApplicationDependencyProvider.java | 10 +- .../impl/WebsocketDrainedConstraint.java | 4 +- .../WebsocketDrainedConstraintObserver.java | 6 +- .../messages/BackgroundMessageRetriever.java | 18 +-- .../messages/IncomingMessageObserver.java | 113 ++++++++----- .../messages/InitialMessageRetriever.java | 151 ------------------ .../api/SignalServiceMessagePipe.java | 6 +- .../websocket/WebSocketConnection.java | 2 + 10 files changed, 106 insertions(+), 266 deletions(-) delete mode 100644 app/src/main/java/org/thoughtcrime/securesms/messages/InitialMessageRetriever.java diff --git a/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java b/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java index 2a39899a5c..1487275724 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java +++ b/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java @@ -51,7 +51,6 @@ import org.thoughtcrime.securesms.logging.CustomSignalProtocolLogger; import org.thoughtcrime.securesms.logging.Log; import org.thoughtcrime.securesms.logging.PersistentLogger; import org.thoughtcrime.securesms.logging.SignalUncaughtExceptionHandler; -import org.thoughtcrime.securesms.messages.InitialMessageRetriever; import org.thoughtcrime.securesms.migrations.ApplicationMigrations; import org.thoughtcrime.securesms.notifications.NotificationChannels; import org.thoughtcrime.securesms.providers.BlobProvider; @@ -61,7 +60,6 @@ import org.thoughtcrime.securesms.revealable.ViewOnceMessageManager; import org.thoughtcrime.securesms.ringrtc.RingRtcLogger; import org.thoughtcrime.securesms.service.DirectoryRefreshListener; import org.thoughtcrime.securesms.service.ExpiringMessageManager; -import org.thoughtcrime.securesms.messages.IncomingMessageObserver; import org.thoughtcrime.securesms.service.KeyCachingService; import org.thoughtcrime.securesms.service.LocalBackupListener; import org.thoughtcrime.securesms.service.RotateSenderCertificateListener; @@ -97,7 +95,6 @@ public class ApplicationContext extends MultiDexApplication implements DefaultLi private ViewOnceMessageManager viewOnceMessageManager; private TypingStatusRepository typingStatusRepository; private TypingStatusSender typingStatusSender; - private IncomingMessageObserver incomingMessageObserver; private PersistentLogger persistentLogger; private volatile boolean isAppVisible; @@ -157,7 +154,6 @@ public class ApplicationContext extends MultiDexApplication implements DefaultLi KeyCachingService.onAppForegrounded(this); ApplicationDependencies.getFrameRateTracker().begin(); ApplicationDependencies.getMegaphoneRepository().onAppForegrounded(); - catchUpOnMessages(); } @Override @@ -234,7 +230,7 @@ public class ApplicationContext extends MultiDexApplication implements DefaultLi } public void initializeMessageRetrieval() { - this.incomingMessageObserver = new IncomingMessageObserver(this); + ApplicationDependencies.getIncomingMessageObserver(); } private void initializeAppDependencies() { @@ -382,36 +378,6 @@ public class ApplicationContext extends MultiDexApplication implements DefaultLi }); } - private void catchUpOnMessages() { - InitialMessageRetriever retriever = ApplicationDependencies.getInitialMessageRetriever(); - - if (retriever.isCaughtUp()) { - return; - } - - SignalExecutors.UNBOUNDED.execute(() -> { - long startTime = System.currentTimeMillis(); - - switch (retriever.begin(TimeUnit.SECONDS.toMillis(60))) { - case SUCCESS: - Log.i(TAG, "Successfully caught up on messages. " + (System.currentTimeMillis() - startTime) + " ms"); - break; - case FAILURE_TIMEOUT: - Log.w(TAG, "Did not finish catching up due to a timeout. " + (System.currentTimeMillis() - startTime) + " ms"); - break; - case FAILURE_ERROR: - Log.w(TAG, "Did not finish catching up due to an error. " + (System.currentTimeMillis() - startTime) + " ms"); - break; - case SKIPPED_ALREADY_CAUGHT_UP: - Log.i(TAG, "Already caught up. " + (System.currentTimeMillis() - startTime) + " ms"); - break; - case SKIPPED_ALREADY_RUNNING: - Log.i(TAG, "Already in the process of catching up. " + (System.currentTimeMillis() - startTime) + " ms"); - break; - } - }); - } - @Override protected void attachBaseContext(Context base) { super.attachBaseContext(DynamicLanguageContextWrapper.updateContext(base, TextSecurePreferences.getLanguage(base))); diff --git a/app/src/main/java/org/thoughtcrime/securesms/dependencies/ApplicationDependencies.java b/app/src/main/java/org/thoughtcrime/securesms/dependencies/ApplicationDependencies.java index 93a6405e6c..e52635a7f4 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/dependencies/ApplicationDependencies.java +++ b/app/src/main/java/org/thoughtcrime/securesms/dependencies/ApplicationDependencies.java @@ -14,8 +14,6 @@ import org.thoughtcrime.securesms.jobmanager.JobManager; import org.thoughtcrime.securesms.keyvalue.KeyValueStore; import org.thoughtcrime.securesms.keyvalue.SignalStore; import org.thoughtcrime.securesms.megaphone.MegaphoneRepository; -import org.thoughtcrime.securesms.messages.InitialMessageRetriever; -import org.thoughtcrime.securesms.notifications.DefaultMessageNotifier; import org.thoughtcrime.securesms.notifications.MessageNotifier; import org.thoughtcrime.securesms.push.SignalServiceNetworkAccess; import org.thoughtcrime.securesms.recipients.LiveRecipientCache; @@ -48,6 +46,7 @@ public class ApplicationDependencies { private static SignalServiceAccountManager accountManager; private static SignalServiceMessageSender messageSender; private static SignalServiceMessageReceiver messageReceiver; + private static IncomingMessageObserver incomingMessageObserver; private static IncomingMessageProcessor incomingMessageProcessor; private static BackgroundMessageRetriever backgroundMessageRetriever; private static LiveRecipientCache recipientCache; @@ -59,7 +58,6 @@ public class ApplicationDependencies { private static GroupsV2StateProcessor groupsV2StateProcessor; private static GroupsV2Operations groupsV2Operations; private static EarlyMessageCache earlyMessageCache; - private static InitialMessageRetriever initialMessageRetriever; private static MessageNotifier messageNotifier; @MainThread @@ -242,21 +240,21 @@ public class ApplicationDependencies { return earlyMessageCache; } - public static synchronized @NonNull InitialMessageRetriever getInitialMessageRetriever() { - assertInitialization(); - - if (initialMessageRetriever == null) { - initialMessageRetriever = provider.provideInitialMessageRetriever(); - } - - return initialMessageRetriever; - } - public static synchronized @NonNull MessageNotifier getMessageNotifier() { assertInitialization(); return messageNotifier; } + public static synchronized @NonNull IncomingMessageObserver getIncomingMessageObserver() { + assertInitialization(); + + if (incomingMessageObserver == null) { + incomingMessageObserver = provider.provideIncomingMessageObserver(); + } + + return incomingMessageObserver; + } + private static void assertInitialization() { if (application == null || provider == null) { throw new UninitializedException(); @@ -277,8 +275,8 @@ public class ApplicationDependencies { @NonNull KeyValueStore provideKeyValueStore(); @NonNull MegaphoneRepository provideMegaphoneRepository(); @NonNull EarlyMessageCache provideEarlyMessageCache(); - @NonNull InitialMessageRetriever provideInitialMessageRetriever(); @NonNull MessageNotifier provideMessageNotifier(); + @NonNull IncomingMessageObserver provideIncomingMessageObserver(); } private static class UninitializedException extends IllegalStateException { diff --git a/app/src/main/java/org/thoughtcrime/securesms/dependencies/ApplicationDependencyProvider.java b/app/src/main/java/org/thoughtcrime/securesms/dependencies/ApplicationDependencyProvider.java index 7fab281fa1..f5436f7a19 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/dependencies/ApplicationDependencyProvider.java +++ b/app/src/main/java/org/thoughtcrime/securesms/dependencies/ApplicationDependencyProvider.java @@ -29,7 +29,6 @@ import org.thoughtcrime.securesms.jobs.JobManagerFactories; import org.thoughtcrime.securesms.keyvalue.KeyValueStore; import org.thoughtcrime.securesms.logging.Log; import org.thoughtcrime.securesms.megaphone.MegaphoneRepository; -import org.thoughtcrime.securesms.messages.InitialMessageRetriever; import org.thoughtcrime.securesms.notifications.DefaultMessageNotifier; import org.thoughtcrime.securesms.notifications.MessageNotifier; import org.thoughtcrime.securesms.notifications.OptimizedMessageNotifier; @@ -55,7 +54,6 @@ import org.whispersystems.signalservice.api.util.UptimeSleepTimer; import org.whispersystems.signalservice.api.websocket.ConnectivityListener; import java.util.UUID; -import java.util.concurrent.Executors; /** * Implementation of {@link ApplicationDependencies.Provider} that provides real app dependencies. @@ -171,13 +169,13 @@ public class ApplicationDependencyProvider implements ApplicationDependencies.Pr } @Override - public @NonNull InitialMessageRetriever provideInitialMessageRetriever() { - return new InitialMessageRetriever(); + public @NonNull MessageNotifier provideMessageNotifier() { + return new OptimizedMessageNotifier(new DefaultMessageNotifier()); } @Override - public @NonNull MessageNotifier provideMessageNotifier() { - return new OptimizedMessageNotifier(new DefaultMessageNotifier()); + public @NonNull IncomingMessageObserver provideIncomingMessageObserver() { + return new IncomingMessageObserver(context); } private static class DynamicCredentialsProvider implements CredentialsProvider { diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/impl/WebsocketDrainedConstraint.java b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/impl/WebsocketDrainedConstraint.java index a3b1de0040..968d373121 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/impl/WebsocketDrainedConstraint.java +++ b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/impl/WebsocketDrainedConstraint.java @@ -10,7 +10,7 @@ import org.thoughtcrime.securesms.jobmanager.Constraint; /** * A constraint that is met once we have pulled down all messages from the websocket during initial - * load. See {@link org.thoughtcrime.securesms.messages.InitialMessageRetriever}. + * load. See {@link org.thoughtcrime.securesms.messages.IncomingMessageObserver}. */ public final class WebsocketDrainedConstraint implements Constraint { @@ -21,7 +21,7 @@ public final class WebsocketDrainedConstraint implements Constraint { @Override public boolean isMet() { - return ApplicationDependencies.getInitialMessageRetriever().isCaughtUp(); + return ApplicationDependencies.getIncomingMessageObserver().isWebsocketDrained(); } @Override diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/impl/WebsocketDrainedConstraintObserver.java b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/impl/WebsocketDrainedConstraintObserver.java index 7df093f2ea..2ca38ce4c4 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/impl/WebsocketDrainedConstraintObserver.java +++ b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/impl/WebsocketDrainedConstraintObserver.java @@ -6,8 +6,8 @@ import org.thoughtcrime.securesms.dependencies.ApplicationDependencies; import org.thoughtcrime.securesms.jobmanager.ConstraintObserver; /** - * An observer for {@link WebsocketDrainedConstraint}. Will fire when the - * {@link org.thoughtcrime.securesms.messages.InitialMessageRetriever} is caught up. + * An observer for {@link WebsocketDrainedConstraint}. Will fire when the websocket is drained + * (i.e. it has received an empty response). */ public class WebsocketDrainedConstraintObserver implements ConstraintObserver { @@ -16,7 +16,7 @@ public class WebsocketDrainedConstraintObserver implements ConstraintObserver { private volatile Notifier notifier; public WebsocketDrainedConstraintObserver() { - ApplicationDependencies.getInitialMessageRetriever().addListener(() -> { + ApplicationDependencies.getIncomingMessageObserver().addWebsocketDrainedListener(() -> { if (notifier != null) { notifier.onConstraintMet(REASON); } diff --git a/app/src/main/java/org/thoughtcrime/securesms/messages/BackgroundMessageRetriever.java b/app/src/main/java/org/thoughtcrime/securesms/messages/BackgroundMessageRetriever.java index 08a2fa8372..6084c0b312 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/messages/BackgroundMessageRetriever.java +++ b/app/src/main/java/org/thoughtcrime/securesms/messages/BackgroundMessageRetriever.java @@ -29,7 +29,6 @@ public class BackgroundMessageRetriever { private static final Semaphore ACTIVE_LOCK = new Semaphore(2); - private static final long CATCHUP_TIMEOUT = TimeUnit.SECONDS.toMillis(60); private static final long NORMAL_TIMEOUT = TimeUnit.SECONDS.toMillis(10); /** @@ -64,21 +63,8 @@ public class BackgroundMessageRetriever { Log.w(TAG, "We may be operating in a constrained environment. Doze: " + doze + " Network: " + network); } - if (ApplicationDependencies.getInitialMessageRetriever().isCaughtUp()) { - Log.i(TAG, "Performing normal message fetch."); - return executeBackgroundRetrieval(context, startTime, strategies); - } else { - Log.i(TAG, "Performing initial message fetch."); - InitialMessageRetriever.Result result = ApplicationDependencies.getInitialMessageRetriever().begin(CATCHUP_TIMEOUT); - if (result == InitialMessageRetriever.Result.SUCCESS) { - Log.i(TAG, "Initial message request was completed successfully. " + logSuffix(startTime)); - TextSecurePreferences.setNeedsMessagePull(context, false); - return true; - } else { - Log.w(TAG, "Initial message fetch returned result " + result + ", so doing a normal message fetch."); - return executeBackgroundRetrieval(context, System.currentTimeMillis(), strategies); - } - } + Log.i(TAG, "Performing normal message fetch."); + return executeBackgroundRetrieval(context, startTime, strategies); } finally { WakeLockUtil.release(wakeLock, WAKE_LOCK_TAG); ACTIVE_LOCK.release(); diff --git a/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.java b/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.java index b9cf57a229..e2dabf74cb 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.java +++ b/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.java @@ -4,8 +4,12 @@ import android.app.Service; import androidx.lifecycle.DefaultLifecycleObserver; import androidx.lifecycle.LifecycleOwner; import androidx.lifecycle.ProcessLifecycleOwner; + +import android.content.BroadcastReceiver; import android.content.Context; import android.content.Intent; +import android.content.IntentFilter; +import android.net.ConnectivityManager; import android.os.IBinder; import androidx.annotation.NonNull; import androidx.annotation.Nullable; @@ -25,13 +29,17 @@ import org.thoughtcrime.securesms.notifications.NotificationChannels; import org.thoughtcrime.securesms.push.SignalServiceNetworkAccess; import org.thoughtcrime.securesms.util.TextSecurePreferences; import org.whispersystems.libsignal.InvalidVersionException; +import org.whispersystems.libsignal.util.guava.Optional; import org.whispersystems.signalservice.api.SignalServiceMessagePipe; import org.whispersystems.signalservice.api.SignalServiceMessageReceiver; +import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -public class IncomingMessageObserver implements ConstraintObserver.Notifier { +public class IncomingMessageObserver { private static final String TAG = IncomingMessageObserver.class.getSimpleName(); @@ -41,19 +49,19 @@ public class IncomingMessageObserver implements ConstraintObserver.Notifier { private static SignalServiceMessagePipe pipe = null; private static SignalServiceMessagePipe unidentifiedPipe = null; - private final Context context; - private final NetworkConstraint networkConstraint; - private final SignalServiceNetworkAccess networkAccess; + private final Context context; + private final SignalServiceNetworkAccess networkAccess; + private final List websocketDrainedListeners; private boolean appVisible; + private volatile boolean websocketDrained; public IncomingMessageObserver(@NonNull Context context) { - this.context = context; - this.networkConstraint = new NetworkConstraint.Factory(ApplicationContext.getInstance(context)).create(); - this.networkAccess = ApplicationDependencies.getSignalServiceNetworkAccess(); + this.context = context; + this.networkAccess = ApplicationDependencies.getSignalServiceNetworkAccess(); + this.websocketDrainedListeners = new CopyOnWriteArrayList<>(); - new NetworkConstraintObserver(ApplicationContext.getInstance(context)).register(this); new MessageRetrievalThread().start(); if (TextSecurePreferences.isFcmDisabled(context)) { @@ -72,16 +80,32 @@ public class IncomingMessageObserver implements ConstraintObserver.Notifier { } }); - ApplicationDependencies.getInitialMessageRetriever().addListener(this::onInitialRetrievalComplete); + context.registerReceiver(new BroadcastReceiver() { + @Override + public void onReceive(Context context, Intent intent) { + synchronized (IncomingMessageObserver.this) { + if (!NetworkConstraint.isMet(context)) { + Log.w(TAG, "Lost network connection. Shutting down our websocket connections and resetting the drained state."); + websocketDrained = false; + shutdown(pipe, unidentifiedPipe); + } + IncomingMessageObserver.this.notifyAll(); + } + } + }, new IntentFilter(ConnectivityManager.CONNECTIVITY_ACTION)); } - @Override - public void onConstraintMet(@NonNull String reason) { - synchronized (this) { - notifyAll(); + public synchronized void addWebsocketDrainedListener(@NonNull Runnable listener) { + websocketDrainedListeners.add(listener); + if (websocketDrained) { + listener.run(); } } + public boolean isWebsocketDrained() { + return websocketDrained; + } + private synchronized void onAppForegrounded() { appVisible = true; notifyAll(); @@ -92,21 +116,19 @@ public class IncomingMessageObserver implements ConstraintObserver.Notifier { notifyAll(); } - private synchronized void onInitialRetrievalComplete() { - notifyAll(); - } - private synchronized boolean isConnectionNecessary() { - boolean isGcmDisabled = TextSecurePreferences.isFcmDisabled(context); + boolean registered = TextSecurePreferences.isPushRegistered(context); + boolean websocketRegistered = TextSecurePreferences.isWebsocketRegistered(context); + boolean isGcmDisabled = TextSecurePreferences.isFcmDisabled(context); + boolean hasNetwork = NetworkConstraint.isMet(context); - Log.d(TAG, String.format("Network requirement: %s, app visible: %s, gcm disabled: %b", - networkConstraint.isMet(), appVisible, isGcmDisabled)); + Log.d(TAG, String.format("Network: %s, Foreground: %s, FCM: %s, Censored: %s, Registered: %s, Websocket Registered: %s", + hasNetwork, appVisible, !isGcmDisabled, networkAccess.isCensored(context), registered, websocketRegistered)); - return TextSecurePreferences.isPushRegistered(context) && - TextSecurePreferences.isWebsocketRegistered(context) && - (appVisible || isGcmDisabled) && - networkConstraint.isMet() && - ApplicationDependencies.getInitialMessageRetriever().isCaughtUp() && + return registered && + websocketRegistered && + (appVisible || isGcmDisabled) && + hasNetwork && !networkAccess.isCensored(context); } @@ -118,12 +140,21 @@ public class IncomingMessageObserver implements ConstraintObserver.Notifier { } } - private void shutdown(SignalServiceMessagePipe pipe, SignalServiceMessagePipe unidentifiedPipe) { + private void shutdown(@Nullable SignalServiceMessagePipe pipe, @Nullable SignalServiceMessagePipe unidentifiedPipe) { try { - pipe.shutdown(); - unidentifiedPipe.shutdown(); + if (pipe != null) { + pipe.shutdown(); + } } catch (Throwable t) { - Log.w(TAG, t); + Log.w(TAG, "Closing normal pipe failed!", t); + } + + try { + if (unidentifiedPipe != null) { + unidentifiedPipe.shutdown(); + } + } catch (Throwable t) { + Log.w(TAG, "Closing unidentified pipe failed!", t); } } @@ -160,14 +191,22 @@ public class IncomingMessageObserver implements ConstraintObserver.Notifier { try { while (isConnectionNecessary()) { try { - Log.i(TAG, "Reading message..."); - localPipe.read(REQUEST_TIMEOUT_MINUTES, TimeUnit.MINUTES, - envelope -> { - Log.i(TAG, "Retrieved envelope! " + envelope.getTimestamp()); - try (Processor processor = ApplicationDependencies.getIncomingMessageProcessor().acquire()) { - processor.processEnvelope(envelope); - } - }); + Log.d(TAG, "Reading message..."); + Optional result = localPipe.readOrEmpty(REQUEST_TIMEOUT_MINUTES, TimeUnit.MINUTES, envelope -> { + Log.i(TAG, "Retrieved envelope! " + envelope.getTimestamp()); + try (Processor processor = ApplicationDependencies.getIncomingMessageProcessor().acquire()) { + processor.processEnvelope(envelope); + } + }); + + if (!result.isPresent() && !websocketDrained) { + Log.i(TAG, "Websocket was newly-drained. Triggering listeners."); + websocketDrained = true; + + for (Runnable listener : websocketDrainedListeners) { + listener.run(); + } + } } catch (TimeoutException e) { Log.w(TAG, "Application level read timeout..."); } catch (InvalidVersionException e) { diff --git a/app/src/main/java/org/thoughtcrime/securesms/messages/InitialMessageRetriever.java b/app/src/main/java/org/thoughtcrime/securesms/messages/InitialMessageRetriever.java deleted file mode 100644 index 2bf01eee35..0000000000 --- a/app/src/main/java/org/thoughtcrime/securesms/messages/InitialMessageRetriever.java +++ /dev/null @@ -1,151 +0,0 @@ -package org.thoughtcrime.securesms.messages; - -import android.content.Context; - -import androidx.annotation.NonNull; -import androidx.annotation.WorkerThread; - -import org.thoughtcrime.securesms.ApplicationContext; -import org.thoughtcrime.securesms.dependencies.ApplicationDependencies; -import org.thoughtcrime.securesms.logging.Log; -import org.thoughtcrime.securesms.util.concurrent.SignalExecutors; - -import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -/** - * Fetches the first batch of messages, before anything else does. - * - * We have a separate process for fetching "initial" messages in order to have special behavior when - * catching up on a lot of messages after being offline for a while. It also gives us an opportunity - * to flag when we are "up-to-date" with our message queue. - */ -public class InitialMessageRetriever { - - private static final String TAG = Log.tag(InitialMessageRetriever.class); - - private static final int MAX_ATTEMPTS = 3; - - private final List listeners = new CopyOnWriteArrayList<>(); - - private State state = State.NOT_CAUGHT_UP; - - private final Object STATE_LOCK = new Object(); - - /** - * Only fires once. No need to remove. It will be called on an arbitrary worker thread. - */ - public void addListener(@NonNull Listener listener) { - synchronized (STATE_LOCK) { - if (state == State.CAUGHT_UP) { - listener.onCaughtUp(); - } else { - listeners.add(listener); - } - } - } - - /** - * Performs the initial fetch for messages (if necessary) with the requested timeout. The timeout - * is not just for the initial network request, but for the entire method call. - * - * @return A result describing how the operation completed. - */ - @WorkerThread - public @NonNull Result begin(long timeout) { - synchronized (STATE_LOCK) { - if (state == State.CAUGHT_UP) { - return Result.SKIPPED_ALREADY_CAUGHT_UP; - } else if (state == State.RUNNING) { - return Result.SKIPPED_ALREADY_RUNNING; - } - - state = State.RUNNING; - } - - long startTime = System.currentTimeMillis(); - - MessageRetrievalStrategy messageRetrievalStrategy = getRetriever(); - CountDownLatch latch = new CountDownLatch(1); - - SignalExecutors.UNBOUNDED.execute(() -> { - for (int i = 0; i < MAX_ATTEMPTS; i++) { - if (messageRetrievalStrategy.isCanceled()) { - Log.w(TAG, "Invalidated! Ending attempts."); - break; - } - - boolean success = getRetriever().execute(timeout); - - if (success) { - break; - } else { - Log.w(TAG, "Failed to catch up! Attempt " + (i + 1) + "/" + MAX_ATTEMPTS); - } - } - - latch.countDown(); - }); - - try { - boolean success = latch.await(timeout, TimeUnit.MILLISECONDS); - - synchronized (STATE_LOCK) { - state = State.CAUGHT_UP; - - for (Listener listener : listeners) { - listener.onCaughtUp(); - } - - listeners.clear(); - } - - ApplicationDependencies.getMessageNotifier().updateNotification(ApplicationDependencies.getApplication()); - - if (success) { - Log.i(TAG, "Successfully caught up in " + (System.currentTimeMillis() - startTime) + " ms."); - return Result.SUCCESS; - } else { - Log.i(TAG, "Could not catch up completely. Hit the timeout of " + timeout + " ms."); - messageRetrievalStrategy.cancel(); - return Result.FAILURE_TIMEOUT; - } - } catch (InterruptedException e) { - Log.w(TAG, "Interrupted!", e); - return Result.FAILURE_ERROR; - } - } - - public boolean isCaughtUp() { - synchronized (STATE_LOCK) { - return state == State.CAUGHT_UP; - } - } - - private @NonNull MessageRetrievalStrategy getRetriever() { - Context context = ApplicationDependencies.getApplication(); - - if (ApplicationContext.getInstance(context).isAppVisible() && - !ApplicationDependencies.getSignalServiceNetworkAccess().isCensored(context)) - { - return new WebsocketStrategy(); - } else { - return new RestStrategy(); - } - } - - private enum State { - NOT_CAUGHT_UP, RUNNING, CAUGHT_UP - } - - public enum Result { - SUCCESS, FAILURE_TIMEOUT, FAILURE_ERROR, SKIPPED_ALREADY_CAUGHT_UP, SKIPPED_ALREADY_RUNNING - } - - public interface Listener { - @WorkerThread - void onCaughtUp(); - } -} diff --git a/libsignal/service/src/main/java/org/whispersystems/signalservice/api/SignalServiceMessagePipe.java b/libsignal/service/src/main/java/org/whispersystems/signalservice/api/SignalServiceMessagePipe.java index 6c396c30ab..a1ea81d971 100644 --- a/libsignal/service/src/main/java/org/whispersystems/signalservice/api/SignalServiceMessagePipe.java +++ b/libsignal/service/src/main/java/org/whispersystems/signalservice/api/SignalServiceMessagePipe.java @@ -132,8 +132,10 @@ public class SignalServiceMessagePipe { * {@link Optional#absent()} when an empty response is hit, which indicates the websocket is * empty. * - * Important: The empty response will only be hit once for each instance of {@link SignalServiceMessagePipe}. - * That means subsequent calls will block until an envelope is available. + * Important: The empty response will only be hit once for each connection. That means if you get + * an empty response and call readOrEmpty() again on the same instance, you will not get an empty + * response, and instead will block until you get an actual message. This will, however, reset if + * connection breaks (if, for instance, you lose and regain network). */ public Optional readOrEmpty(long timeout, TimeUnit unit, MessagePipeCallback callback) throws TimeoutException, IOException, InvalidVersionException diff --git a/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/websocket/WebSocketConnection.java b/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/websocket/WebSocketConnection.java index 206760de85..02a7be40c6 100644 --- a/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/websocket/WebSocketConnection.java +++ b/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/websocket/WebSocketConnection.java @@ -150,6 +150,8 @@ public class WebSocketConnection extends WebSocketListener { keepAliveSender.shutdown(); keepAliveSender = null; } + + notifyAll(); } public synchronized WebSocketRequestMessage readRequest(long timeoutMillis)