diff --git a/src/org/thoughtcrime/securesms/gcm/GcmBroadcastReceiver.java b/src/org/thoughtcrime/securesms/gcm/GcmBroadcastReceiver.java index 1e51f0a317..34c78818aa 100644 --- a/src/org/thoughtcrime/securesms/gcm/GcmBroadcastReceiver.java +++ b/src/org/thoughtcrime/securesms/gcm/GcmBroadcastReceiver.java @@ -19,10 +19,12 @@ import org.thoughtcrime.securesms.jobs.PushNotificationReceiveJob; import org.thoughtcrime.securesms.service.GenericForegroundService; import org.thoughtcrime.securesms.util.PowerManagerCompat; import org.thoughtcrime.securesms.util.TextSecurePreferences; +import org.thoughtcrime.securesms.util.concurrent.SignalExecutors; import org.whispersystems.signalservice.api.SignalServiceMessageReceiver; import org.whispersystems.signalservice.internal.util.Util; import java.io.IOException; +import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; import javax.inject.Inject; @@ -31,6 +33,10 @@ public class GcmBroadcastReceiver extends WakefulBroadcastReceiver implements In private static final String TAG = GcmBroadcastReceiver.class.getSimpleName(); + private static final Executor MESSAGE_EXECUTOR = SignalExecutors.newCachedSingleThreadExecutor("GcmMessageProcessing"); + + private static int activeCount = 0; + @Inject SignalServiceMessageReceiver messageReceiver; @Override @@ -62,6 +68,11 @@ public class GcmBroadcastReceiver extends WakefulBroadcastReceiver implements In } private void handleReceivedNotification(Context context) { + if (!incrementActiveGcmCount()) { + Log.i(TAG, "Skipping GCM processing -- there's already one enqueued."); + return; + } + TextSecurePreferences.setNeedsMessagePull(context, true); long startTime = System.currentTimeMillis(); @@ -81,29 +92,28 @@ public class GcmBroadcastReceiver extends WakefulBroadcastReceiver implements In callback.finish(); } - new Thread("GcmMessageProcessing") { - @Override - public void run() { - try { - new PushNotificationReceiveJob(context).pullAndProcessMessages(messageReceiver, TAG, startTime); - } catch (IOException e) { - Log.i(TAG, "Failed to retrieve the envelope. Scheduling on JobManager.", e); - ApplicationContext.getInstance(context) - .getJobManager() - .add(new PushNotificationReceiveJob(context)); - } finally { - synchronized (foregroundLock) { - if (foregroundRunning.getAndSet(false)) { - GenericForegroundService.stopForegroundTask(context); - } else { - callback.finish(); - } - taskCompleted.set(true); + MESSAGE_EXECUTOR.execute(() -> { + try { + new PushNotificationReceiveJob(context).pullAndProcessMessages(messageReceiver, TAG, startTime); + } catch (IOException e) { + Log.i(TAG, "Failed to retrieve the envelope. Scheduling on JobManager.", e); + ApplicationContext.getInstance(context) + .getJobManager() + .add(new PushNotificationReceiveJob(context)); + } finally { + synchronized (foregroundLock) { + if (foregroundRunning.getAndSet(false)) { + GenericForegroundService.stopForegroundTask(context); + } else { + callback.finish(); } - Log.i(TAG, "Processing complete."); + taskCompleted.set(true); } + + decrementActiveGcmCount(); + Log.i(TAG, "Processing complete."); } - }.start(); + }); if (!foregroundRunning.get()) { new Thread("GcmForegroundServiceTimer") { @@ -121,4 +131,16 @@ public class GcmBroadcastReceiver extends WakefulBroadcastReceiver implements In }.start(); } } + + private static synchronized boolean incrementActiveGcmCount() { + if (activeCount < 2) { + activeCount++; + return true; + } + return false; + } + + private static synchronized void decrementActiveGcmCount() { + activeCount--; + } } \ No newline at end of file diff --git a/src/org/thoughtcrime/securesms/util/concurrent/SignalExecutors.java b/src/org/thoughtcrime/securesms/util/concurrent/SignalExecutors.java index e9a756a30a..62dba0b54e 100644 --- a/src/org/thoughtcrime/securesms/util/concurrent/SignalExecutors.java +++ b/src/org/thoughtcrime/securesms/util/concurrent/SignalExecutors.java @@ -4,7 +4,10 @@ import android.support.annotation.NonNull; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; public class SignalExecutors { @@ -16,4 +19,11 @@ public class SignalExecutors { return new Thread(r, "signal-io-" + counter.getAndIncrement()); } }); + + + public static ExecutorService newCachedSingleThreadExecutor(final String name) { + ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 15, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), r -> new Thread(r, name)); + executor.allowCoreThreadTimeOut(true); + return executor; + } }