diff --git a/app/src/main/java/org/thoughtcrime/securesms/notifications/OptimizedMessageNotifier.java b/app/src/main/java/org/thoughtcrime/securesms/notifications/OptimizedMessageNotifier.java index a30aa3e87e..b784acdc57 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/notifications/OptimizedMessageNotifier.java +++ b/app/src/main/java/org/thoughtcrime/securesms/notifications/OptimizedMessageNotifier.java @@ -1,32 +1,27 @@ package org.thoughtcrime.securesms.notifications; import android.content.Context; +import android.os.Handler; import androidx.annotation.MainThread; import androidx.annotation.NonNull; -import org.thoughtcrime.securesms.dependencies.ApplicationDependencies; -import org.thoughtcrime.securesms.messages.InitialMessageRetriever; import org.thoughtcrime.securesms.recipients.Recipient; -import org.thoughtcrime.securesms.util.Throttler; - -import java.util.concurrent.TimeUnit; +import org.thoughtcrime.securesms.util.LeakyBucketLimiter; +import org.thoughtcrime.securesms.util.concurrent.SignalExecutors; /** - * Wraps another {@link MessageNotifier} and throttles it while {@link InitialMessageRetriever} is - * running. + * Uses a leaky-bucket strategy to limiting notification updates. */ public class OptimizedMessageNotifier implements MessageNotifier { - private final MessageNotifier wrapped; - private final Throttler throttler; - private final InitialMessageRetriever retriever; + private final MessageNotifier wrapped; + private final LeakyBucketLimiter limiter; @MainThread public OptimizedMessageNotifier(@NonNull MessageNotifier wrapped) { - this.wrapped = wrapped; - this.throttler = new Throttler(TimeUnit.SECONDS.toMillis(5)); - this.retriever = ApplicationDependencies.getInitialMessageRetriever(); + this.wrapped = wrapped; + this.limiter = new LeakyBucketLimiter(5, 1000, new Handler(SignalExecutors.getAndStartHandlerThread("signal-notifier").getLooper())); } @Override @@ -56,38 +51,22 @@ public class OptimizedMessageNotifier implements MessageNotifier { @Override public void updateNotification(@NonNull Context context) { - if (retriever.isCaughtUp()) { - wrapped.updateNotification(context); - } else { - throttler.publish(() -> wrapped.updateNotification(context)); - } + limiter.run(() -> wrapped.updateNotification(context)); } @Override public void updateNotification(@NonNull Context context, long threadId) { - if (retriever.isCaughtUp()) { - wrapped.updateNotification(context, threadId); - } else { - throttler.publish(() -> wrapped.updateNotification(context)); - } + limiter.run(() -> wrapped.updateNotification(context, threadId)); } @Override public void updateNotification(@NonNull Context context, long threadId, boolean signal) { - if (retriever.isCaughtUp()) { - wrapped.updateNotification(context, threadId, signal); - } else { - throttler.publish(() -> wrapped.updateNotification(context)); - } + limiter.run(() -> wrapped.updateNotification(context, threadId, signal)); } @Override public void updateNotification(@NonNull Context context, long threadId, boolean signal, int reminderCount) { - if (retriever.isCaughtUp()) { - wrapped.updateNotification(context, threadId, signal, reminderCount); - } else { - throttler.publish(() -> wrapped.updateNotification(context)); - } + limiter.run(() -> wrapped.updateNotification(context, threadId, signal, reminderCount)); } @Override diff --git a/app/src/main/java/org/thoughtcrime/securesms/util/LeakyBucketLimiter.java b/app/src/main/java/org/thoughtcrime/securesms/util/LeakyBucketLimiter.java new file mode 100644 index 0000000000..ca485088bf --- /dev/null +++ b/app/src/main/java/org/thoughtcrime/securesms/util/LeakyBucketLimiter.java @@ -0,0 +1,107 @@ +package org.thoughtcrime.securesms.util; + +import android.os.Handler; + +import androidx.annotation.AnyThread; +import androidx.annotation.NonNull; +import androidx.core.os.HandlerCompat; + +import org.thoughtcrime.securesms.logging.Log; + +/** + * Imagine a bucket. Now imagine your tasks as little droplets. As your tasks are thrown into the + * bucket, the tasks are executed, and the bucket fills up. If the bucket is full, the tasks + * overflow and are discarded. + * + * However, the bucket has a leak! So it empties slowly over time, allowing you to put more tasks in + * if you're patient. + * + * This class lets you define a bucket with a given capacity and drip interval. Imagine you had a + * capacity of 10 and a drip interval of 1000ms. That means that you could execute 10 tasks in + * rapid succession, but afterwards you'd only be able to execute at most 1 task per second. If you + * waited 10 seconds, the bucket would be fully drained, and you'd be able to execute 10 tasks in + * rapid succession again. + * + * This class also does something a little extra -- it keeps track of the most-recently-overflowed + * task, and will run it the next time it 'drips' instead of leaking. This lets you have a sort of + * "throw tasks at the bucket and forget about it" attitude, because you know the task will + * eventually run. + * + * Of course, that's only if all of your tasks are equal! It's highly recommended, as with any sort + * of limiting construct, to only submit a series of equivalent or roughly-equivalent tasks. + * + * Using the assumption that all tasks are equal, this class will also remove any pending tasks that + * are waiting to run when a new one is enqueued. No point in causing a pile-up. + */ +public final class LeakyBucketLimiter { + + private static final String TAG = Log.tag(LeakyBucketLimiter.class); + + private final int bucketCapacity; + private final long dripInterval; + private final Handler handler; + + private int bucketLevel; + private Runnable lastOverflowedRunnable; + + private final Object RUNNABLE_TOKEN = new Object(); + + public LeakyBucketLimiter(int bucketCapacity, long dripInterval, @NonNull Handler handler) { + this.bucketCapacity = bucketCapacity; + this.dripInterval = dripInterval; + this.handler = handler; + } + + @AnyThread + public void run(@NonNull Runnable runnable) { + boolean shouldRun = false; + boolean scheduleDrip = false; + + synchronized (this) { + if (bucketLevel < bucketCapacity) { + bucketLevel++; + + shouldRun = true; + scheduleDrip = bucketLevel == 1; + } else { + lastOverflowedRunnable = runnable; + } + } + + if (shouldRun) { + handler.removeCallbacksAndMessages(RUNNABLE_TOKEN); + HandlerCompat.postDelayed(handler, runnable, RUNNABLE_TOKEN, 0); + } else { + Log.d(TAG, "Overflowed!"); + } + + if (scheduleDrip) { + handler.postDelayed(this::drip, dripInterval); + } + } + + private void drip() { + Runnable runnable = null; + boolean needsDrip = false; + + synchronized (this) { + if (lastOverflowedRunnable == null) { + bucketLevel = Math.max(bucketLevel - 1, 0); + } else { + Log.d(TAG, "Running most-recently-overflowed task."); + runnable = lastOverflowedRunnable; + lastOverflowedRunnable = null; + } + + needsDrip = bucketLevel > 0; + } + + if (runnable != null) { + runnable.run(); + } + + if (needsDrip) { + handler.postDelayed(this::drip, dripInterval); + } + } +} diff --git a/app/src/main/java/org/thoughtcrime/securesms/util/concurrent/SignalExecutors.java b/app/src/main/java/org/thoughtcrime/securesms/util/concurrent/SignalExecutors.java index bcbff01d4f..93fe665a9e 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/util/concurrent/SignalExecutors.java +++ b/app/src/main/java/org/thoughtcrime/securesms/util/concurrent/SignalExecutors.java @@ -1,5 +1,7 @@ package org.thoughtcrime.securesms.util.concurrent; +import android.os.HandlerThread; + import androidx.annotation.NonNull; import com.google.android.gms.common.util.concurrent.NumberedThreadFactory; @@ -76,6 +78,12 @@ public class SignalExecutors { return new ThreadPoolExecutor(minThreads, maxThreads, 0, TimeUnit.MILLISECONDS, new LinkedBlockingLifoQueue<>(), new NumberedThreadFactory(name)); } + public static HandlerThread getAndStartHandlerThread(@NonNull String name) { + HandlerThread handlerThread = new HandlerThread(name); + handlerThread.start(); + return handlerThread; + } + /** * Returns an 'ideal' thread count based on the number of available processors. */