From d3bed549f22f47173c37f642e66a4bed418d9af5 Mon Sep 17 00:00:00 2001 From: Greyson Parrelli Date: Wed, 31 Jul 2019 12:53:19 -0400 Subject: [PATCH] Switch back to storing incoming messages in PushDatabase. On a Pixel 3, this adds ~30-40ms of delay, but it's going to be a requirement for upcoming migrations. --- .../securesms/IncomingMessageProcessor.java | 118 ++++++++++++++++++ .../dependencies/ApplicationDependencies.java | 7 ++ .../ApplicationDependencyProvider.java | 11 ++ .../migration/WorkManagerFactoryMappings.java | 4 +- .../securesms/jobs/FailingJob.java | 46 +++++++ .../securesms/jobs/JobManagerFactories.java | 4 +- .../securesms/jobs/PushContentReceiveJob.java | 49 -------- .../securesms/jobs/PushDecryptJob.java | 50 +++----- .../jobs/PushNotificationReceiveJob.java | 7 +- .../securesms/jobs/PushReceivedJob.java | 37 ------ .../service/IncomingMessageObserver.java | 6 +- 11 files changed, 211 insertions(+), 128 deletions(-) create mode 100644 src/org/thoughtcrime/securesms/IncomingMessageProcessor.java create mode 100644 src/org/thoughtcrime/securesms/jobs/FailingJob.java delete mode 100644 src/org/thoughtcrime/securesms/jobs/PushContentReceiveJob.java diff --git a/src/org/thoughtcrime/securesms/IncomingMessageProcessor.java b/src/org/thoughtcrime/securesms/IncomingMessageProcessor.java new file mode 100644 index 0000000000..c088e36b62 --- /dev/null +++ b/src/org/thoughtcrime/securesms/IncomingMessageProcessor.java @@ -0,0 +1,118 @@ +package org.thoughtcrime.securesms; + +import android.content.Context; + +import androidx.annotation.NonNull; + +import org.thoughtcrime.securesms.database.Address; +import org.thoughtcrime.securesms.database.DatabaseFactory; +import org.thoughtcrime.securesms.database.MessagingDatabase.SyncMessageId; +import org.thoughtcrime.securesms.database.MmsSmsDatabase; +import org.thoughtcrime.securesms.database.PushDatabase; +import org.thoughtcrime.securesms.database.RecipientDatabase; +import org.thoughtcrime.securesms.jobmanager.JobManager; +import org.thoughtcrime.securesms.jobs.DirectoryRefreshJob; +import org.thoughtcrime.securesms.jobs.PushDecryptJob; +import org.thoughtcrime.securesms.logging.Log; +import org.thoughtcrime.securesms.recipients.Recipient; +import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope; + +import java.io.Closeable; +import java.util.Locale; +import java.util.concurrent.locks.ReentrantLock; + +/** + * The central entry point for all envelopes that have been retrieved. Envelopes must be processed + * here to guarantee proper ordering. + */ +public class IncomingMessageProcessor { + + private static final String TAG = Log.tag(IncomingMessageProcessor.class); + + private final Context context; + private final ReentrantLock lock; + + public IncomingMessageProcessor(@NonNull Context context) { + this.context = context; + this.lock = new ReentrantLock(); + } + + /** + * @return An instance of a Processor that will allow you to process messages in a thread safe + * way. Must be closed. + */ + public Processor acquire() { + lock.lock(); + + Thread current = Thread.currentThread(); + Log.d(TAG, "Lock acquired by thread " + current.getId() + " (" + current.getName() + ")"); + + return new Processor(context); + } + + private void release() { + Thread current = Thread.currentThread(); + Log.d(TAG, "Lock about to be released by thread " + current.getId() + " (" + current.getName() + ")"); + + lock.unlock(); + } + + public class Processor implements Closeable { + + private final Context context; + private final RecipientDatabase recipientDatabase; + private final PushDatabase pushDatabase; + private final MmsSmsDatabase mmsSmsDatabase; + private final JobManager jobManager; + + private Processor(@NonNull Context context) { + this.context = context; + this.recipientDatabase = DatabaseFactory.getRecipientDatabase(context); + this.pushDatabase = DatabaseFactory.getPushDatabase(context); + this.mmsSmsDatabase = DatabaseFactory.getMmsSmsDatabase(context); + this.jobManager = ApplicationContext.getInstance(context).getJobManager(); + } + + public void processEnvelope(@NonNull SignalServiceEnvelope envelope) { + if (envelope.hasSource()) { + Address source = Address.fromExternal(context, envelope.getSource()); + Recipient recipient = Recipient.from(context, source, false); + + if (!isActiveNumber(recipient)) { + recipientDatabase.setRegistered(recipient, RecipientDatabase.RegisteredState.REGISTERED); + jobManager.add(new DirectoryRefreshJob(recipient, false)); + } + } + + if (envelope.isReceipt()) { + processReceipt(envelope); + } else if (envelope.isPreKeySignalMessage() || envelope.isSignalMessage() || envelope.isUnidentifiedSender()) { + processMessage(envelope); + } else { + Log.w(TAG, "Received envelope of unknown type: " + envelope.getType()); + } + } + + private void processMessage(@NonNull SignalServiceEnvelope envelope) { + Log.i(TAG, "Received message. Inserting in PushDatabase."); + long id = pushDatabase.insert(envelope); + jobManager.add(new PushDecryptJob(context, id)); + } + + private void processReceipt(@NonNull SignalServiceEnvelope envelope) { + Log.i(TAG, String.format(Locale.ENGLISH, "Received receipt: (XXXXX, %d)", envelope.getTimestamp())); + mmsSmsDatabase.incrementDeliveryReceiptCount(new SyncMessageId(Address.fromExternal(context, envelope.getSource()), + envelope.getTimestamp()), + System.currentTimeMillis()); + } + + private boolean isActiveNumber(@NonNull Recipient recipient) { + return recipient.resolve().getRegistered() == RecipientDatabase.RegisteredState.REGISTERED; + } + + @Override + public void close() { + release(); + } + } +} diff --git a/src/org/thoughtcrime/securesms/dependencies/ApplicationDependencies.java b/src/org/thoughtcrime/securesms/dependencies/ApplicationDependencies.java index 1f905f352e..8e09a5a0fb 100644 --- a/src/org/thoughtcrime/securesms/dependencies/ApplicationDependencies.java +++ b/src/org/thoughtcrime/securesms/dependencies/ApplicationDependencies.java @@ -4,6 +4,7 @@ import android.app.Application; import androidx.annotation.NonNull; +import org.thoughtcrime.securesms.IncomingMessageProcessor; import org.thoughtcrime.securesms.push.SignalServiceNetworkAccess; import org.whispersystems.signalservice.api.SignalServiceAccountManager; import org.whispersystems.signalservice.api.SignalServiceMessageReceiver; @@ -51,6 +52,11 @@ public class ApplicationDependencies { return instance.provider.getSignalServiceNetworkAccess(); } + public static synchronized @NonNull IncomingMessageProcessor getIncomingMessageProcessor() { + assertInitialization(); + return instance.provider.getIncomingMessageProcessor(); + } + private static void assertInitialization() { if (instance == null) { throw new UninitializedException(); @@ -62,6 +68,7 @@ public class ApplicationDependencies { @NonNull SignalServiceMessageSender getSignalServiceMessageSender(); @NonNull SignalServiceMessageReceiver getSignalServiceMessageReceiver(); @NonNull SignalServiceNetworkAccess getSignalServiceNetworkAccess(); + @NonNull IncomingMessageProcessor getIncomingMessageProcessor(); } private static class UninitializedException extends IllegalStateException { diff --git a/src/org/thoughtcrime/securesms/dependencies/ApplicationDependencyProvider.java b/src/org/thoughtcrime/securesms/dependencies/ApplicationDependencyProvider.java index ec1f93b3db..29dbc2ad97 100644 --- a/src/org/thoughtcrime/securesms/dependencies/ApplicationDependencyProvider.java +++ b/src/org/thoughtcrime/securesms/dependencies/ApplicationDependencyProvider.java @@ -6,6 +6,7 @@ import androidx.annotation.NonNull; import org.greenrobot.eventbus.EventBus; import org.thoughtcrime.securesms.BuildConfig; +import org.thoughtcrime.securesms.IncomingMessageProcessor; import org.thoughtcrime.securesms.crypto.storage.SignalProtocolStoreImpl; import org.thoughtcrime.securesms.events.ReminderUpdateEvent; import org.thoughtcrime.securesms.logging.Log; @@ -36,6 +37,7 @@ public class ApplicationDependencyProvider implements ApplicationDependencies.Pr private SignalServiceAccountManager accountManager; private SignalServiceMessageSender messageSender; private SignalServiceMessageReceiver messageReceiver; + private IncomingMessageProcessor incomingMessageProcessor; public ApplicationDependencyProvider(@NonNull Context context, @NonNull SignalServiceNetworkAccess networkAccess) { this.context = context.getApplicationContext(); @@ -93,6 +95,15 @@ public class ApplicationDependencyProvider implements ApplicationDependencies.Pr return networkAccess; } + @Override + public @NonNull IncomingMessageProcessor getIncomingMessageProcessor() { + if (incomingMessageProcessor == null) { + incomingMessageProcessor = new IncomingMessageProcessor(context); + } + + return incomingMessageProcessor; + } + private static class DynamicCredentialsProvider implements CredentialsProvider { private final Context context; diff --git a/src/org/thoughtcrime/securesms/jobmanager/migration/WorkManagerFactoryMappings.java b/src/org/thoughtcrime/securesms/jobmanager/migration/WorkManagerFactoryMappings.java index d4286d7ccd..00e6f1488d 100644 --- a/src/org/thoughtcrime/securesms/jobmanager/migration/WorkManagerFactoryMappings.java +++ b/src/org/thoughtcrime/securesms/jobmanager/migration/WorkManagerFactoryMappings.java @@ -9,6 +9,7 @@ import org.thoughtcrime.securesms.jobs.AvatarDownloadJob; import org.thoughtcrime.securesms.jobs.CleanPreKeysJob; import org.thoughtcrime.securesms.jobs.CreateSignedPreKeyJob; import org.thoughtcrime.securesms.jobs.DirectoryRefreshJob; +import org.thoughtcrime.securesms.jobs.FailingJob; import org.thoughtcrime.securesms.jobs.FcmRefreshJob; import org.thoughtcrime.securesms.jobs.LocalBackupJob; import org.thoughtcrime.securesms.jobs.MmsDownloadJob; @@ -21,7 +22,6 @@ import org.thoughtcrime.securesms.jobs.MultiDeviceGroupUpdateJob; import org.thoughtcrime.securesms.jobs.MultiDeviceProfileKeyUpdateJob; import org.thoughtcrime.securesms.jobs.MultiDeviceReadUpdateJob; import org.thoughtcrime.securesms.jobs.MultiDeviceVerifiedUpdateJob; -import org.thoughtcrime.securesms.jobs.PushContentReceiveJob; import org.thoughtcrime.securesms.jobs.PushDecryptJob; import org.thoughtcrime.securesms.jobs.PushGroupSendJob; import org.thoughtcrime.securesms.jobs.PushGroupUpdateJob; @@ -71,7 +71,7 @@ public class WorkManagerFactoryMappings { put(MultiDeviceProfileKeyUpdateJob.class.getName(), MultiDeviceProfileKeyUpdateJob.KEY); put(MultiDeviceReadUpdateJob.class.getName(), MultiDeviceReadUpdateJob.KEY); put(MultiDeviceVerifiedUpdateJob.class.getName(), MultiDeviceVerifiedUpdateJob.KEY); - put(PushContentReceiveJob.class.getName(), PushContentReceiveJob.KEY); + put("PushContentReceiveJob", FailingJob.KEY); put(PushDecryptJob.class.getName(), PushDecryptJob.KEY); put(PushGroupSendJob.class.getName(), PushGroupSendJob.KEY); put(PushGroupUpdateJob.class.getName(), PushGroupUpdateJob.KEY); diff --git a/src/org/thoughtcrime/securesms/jobs/FailingJob.java b/src/org/thoughtcrime/securesms/jobs/FailingJob.java new file mode 100644 index 0000000000..87808eb965 --- /dev/null +++ b/src/org/thoughtcrime/securesms/jobs/FailingJob.java @@ -0,0 +1,46 @@ +package org.thoughtcrime.securesms.jobs; + +import androidx.annotation.NonNull; + +import org.thoughtcrime.securesms.jobmanager.Data; +import org.thoughtcrime.securesms.jobmanager.Job; + +/** + * A job that always fails. Not useful on it's own, but you can register it's factory for jobs that + * have been removed that you'd like to fail instead of keeping around. + */ +public final class FailingJob extends Job { + + public static final String KEY = "FailingJob"; + + private FailingJob(@NonNull Parameters parameters) { + super(parameters); + } + + @Override + public @NonNull Data serialize() { + return Data.EMPTY; + } + + @NonNull + @Override + public String getFactoryKey() { + return KEY; + } + + @Override + public @NonNull Result run() { + return Result.failure(); + } + + @Override + public void onCanceled() { + } + + public static final class Factory implements Job.Factory { + @Override + public @NonNull FailingJob create(@NonNull Parameters parameters, @NonNull Data data) { + return new FailingJob(parameters); + } + } +} diff --git a/src/org/thoughtcrime/securesms/jobs/JobManagerFactories.java b/src/org/thoughtcrime/securesms/jobs/JobManagerFactories.java index fe9d2ce24d..58ffd28aba 100644 --- a/src/org/thoughtcrime/securesms/jobs/JobManagerFactories.java +++ b/src/org/thoughtcrime/securesms/jobs/JobManagerFactories.java @@ -45,7 +45,6 @@ public final class JobManagerFactories { put(MultiDeviceStickerPackSyncJob.KEY, new MultiDeviceStickerPackSyncJob.Factory()); put(MultiDeviceVerifiedUpdateJob.KEY, new MultiDeviceVerifiedUpdateJob.Factory()); put(MultiDeviceViewOnceOpenJob.KEY, new MultiDeviceViewOnceOpenJob.Factory()); - put(PushContentReceiveJob.KEY, new PushContentReceiveJob.Factory()); put(PushDecryptJob.KEY, new PushDecryptJob.Factory()); put(PushGroupSendJob.KEY, new PushGroupSendJob.Factory()); put(PushGroupUpdateJob.KEY, new PushGroupUpdateJob.Factory()); @@ -72,6 +71,9 @@ public final class JobManagerFactories { put(TrimThreadJob.KEY, new TrimThreadJob.Factory()); put(TypingSendJob.KEY, new TypingSendJob.Factory()); put(UpdateApkJob.KEY, new UpdateApkJob.Factory()); + + // Dead jobs + put("PushContentReceiveJob", new FailingJob.Factory()); }}; } diff --git a/src/org/thoughtcrime/securesms/jobs/PushContentReceiveJob.java b/src/org/thoughtcrime/securesms/jobs/PushContentReceiveJob.java deleted file mode 100644 index 3032093e49..0000000000 --- a/src/org/thoughtcrime/securesms/jobs/PushContentReceiveJob.java +++ /dev/null @@ -1,49 +0,0 @@ -package org.thoughtcrime.securesms.jobs; - -import android.content.Context; -import androidx.annotation.NonNull; - -import org.thoughtcrime.securesms.jobmanager.Data; -import org.thoughtcrime.securesms.jobmanager.Job; - -public class PushContentReceiveJob extends PushReceivedJob { - - public static final String KEY = "PushContentReceiveJob"; - - public PushContentReceiveJob(Context context) { - this(new Job.Parameters.Builder().build()); - setContext(context); - } - - private PushContentReceiveJob(@NonNull Job.Parameters parameters) { - super(parameters); - } - - @Override - public @NonNull Data serialize() { - return Data.EMPTY; - } - - @Override - public @NonNull String getFactoryKey() { - return KEY; - } - - @Override - public void onRun() { } - - @Override - public void onCanceled() { } - - @Override - public boolean onShouldRetry(@NonNull Exception exception) { - return false; - } - - public static final class Factory implements Job.Factory { - @Override - public @NonNull PushContentReceiveJob create(@NonNull Parameters parameters, @NonNull Data data) { - return new PushContentReceiveJob(parameters); - } - } -} diff --git a/src/org/thoughtcrime/securesms/jobs/PushDecryptJob.java b/src/org/thoughtcrime/securesms/jobs/PushDecryptJob.java index 1502b3dae9..c9b345c98d 100644 --- a/src/org/thoughtcrime/securesms/jobs/PushDecryptJob.java +++ b/src/org/thoughtcrime/securesms/jobs/PushDecryptJob.java @@ -28,6 +28,7 @@ import org.signal.libsignal.metadata.ProtocolUntrustedIdentityException; import org.signal.libsignal.metadata.SelfSendException; import org.thoughtcrime.securesms.ApplicationContext; import org.thoughtcrime.securesms.ConversationListActivity; +import org.thoughtcrime.securesms.IncomingMessageProcessor; import org.thoughtcrime.securesms.R; import org.thoughtcrime.securesms.attachments.Attachment; import org.thoughtcrime.securesms.attachments.DatabaseAttachment; @@ -88,6 +89,7 @@ import org.thoughtcrime.securesms.sms.OutgoingEncryptedMessage; import org.thoughtcrime.securesms.sms.OutgoingEndSessionMessage; import org.thoughtcrime.securesms.sms.OutgoingTextMessage; import org.thoughtcrime.securesms.stickers.StickerLocator; +import org.thoughtcrime.securesms.transport.RetryLaterException; import org.thoughtcrime.securesms.util.GroupUtil; import org.thoughtcrime.securesms.util.Hex; import org.thoughtcrime.securesms.util.IdentityUtil; @@ -141,10 +143,6 @@ public class PushDecryptJob extends BaseJob { private long messageId; private long smsMessageId; - public PushDecryptJob(Context context) { - this(context, -1); - } - public PushDecryptJob(Context context, long pushMessageId) { this(context, pushMessageId, -1); } @@ -152,7 +150,7 @@ public class PushDecryptJob extends BaseJob { public PushDecryptJob(Context context, long pushMessageId, long smsMessageId) { this(new Job.Parameters.Builder() .setQueue("__PUSH_DECRYPT_JOB__") - .setMaxAttempts(10) + .setMaxAttempts(Parameters.UNLIMITED) .build(), pushMessageId, smsMessageId); @@ -179,44 +177,28 @@ public class PushDecryptJob extends BaseJob { } @Override - public void onRun() throws NoSuchMessageException { - synchronized (PushReceivedJob.RECEIVE_LOCK) { - if (needsMigration()) { - Log.w(TAG, "Skipping, waiting for migration..."); - postMigrationNotification(); - return; - } - - PushDatabase database = DatabaseFactory.getPushDatabase(context); - SignalServiceEnvelope envelope = database.get(messageId); - Optional optionalSmsMessageId = smsMessageId > 0 ? Optional.of(smsMessageId) : Optional.absent(); - - handleMessage(envelope, optionalSmsMessageId); - database.delete(messageId); + public void onRun() throws NoSuchMessageException, RetryLaterException { + if (needsMigration()) { + Log.w(TAG, "Migration is still needed."); + postMigrationNotification(); + throw new RetryLaterException(); } + + PushDatabase database = DatabaseFactory.getPushDatabase(context); + SignalServiceEnvelope envelope = database.get(messageId); + Optional optionalSmsMessageId = smsMessageId > 0 ? Optional.of(smsMessageId) : Optional.absent(); + + handleMessage(envelope, optionalSmsMessageId); + database.delete(messageId); } @Override public boolean onShouldRetry(@NonNull Exception exception) { - return false; + return exception instanceof RetryLaterException; } @Override public void onCanceled() { - - } - - public void processMessage(@NonNull SignalServiceEnvelope envelope) { - synchronized (PushReceivedJob.RECEIVE_LOCK) { - if (needsMigration()) { - Log.w(TAG, "Skipping and storing envelope, waiting for migration..."); - DatabaseFactory.getPushDatabase(context).insert(envelope); - postMigrationNotification(); - return; - } - - handleMessage(envelope, Optional.absent()); - } } private boolean needsMigration() { diff --git a/src/org/thoughtcrime/securesms/jobs/PushNotificationReceiveJob.java b/src/org/thoughtcrime/securesms/jobs/PushNotificationReceiveJob.java index 1a1b415f59..1ca26f24c7 100644 --- a/src/org/thoughtcrime/securesms/jobs/PushNotificationReceiveJob.java +++ b/src/org/thoughtcrime/securesms/jobs/PushNotificationReceiveJob.java @@ -3,6 +3,7 @@ package org.thoughtcrime.securesms.jobs; import android.content.Context; import androidx.annotation.NonNull; +import org.thoughtcrime.securesms.IncomingMessageProcessor.Processor; import org.thoughtcrime.securesms.dependencies.ApplicationDependencies; import org.thoughtcrime.securesms.jobmanager.Data; import org.thoughtcrime.securesms.jobmanager.Job; @@ -15,7 +16,7 @@ import org.whispersystems.signalservice.api.push.exceptions.PushNetworkException import java.io.IOException; -public class PushNotificationReceiveJob extends PushReceivedJob { +public class PushNotificationReceiveJob extends BaseJob { public static final String KEY = "PushNotificationReceiveJob"; @@ -51,10 +52,10 @@ public class PushNotificationReceiveJob extends PushReceivedJob { } public void pullAndProcessMessages(SignalServiceMessageReceiver receiver, String tag, long startTime) throws IOException { - synchronized (PushReceivedJob.RECEIVE_LOCK) { + try (Processor processor = ApplicationDependencies.getIncomingMessageProcessor().acquire()) { receiver.retrieveMessages(envelope -> { Log.i(tag, "Retrieved an envelope." + timeSuffix(startTime)); - processEnvelope(envelope); + processor.processEnvelope(envelope); Log.i(tag, "Successfully processed an envelope." + timeSuffix(startTime)); }); TextSecurePreferences.setNeedsMessagePull(context, false); diff --git a/src/org/thoughtcrime/securesms/jobs/PushReceivedJob.java b/src/org/thoughtcrime/securesms/jobs/PushReceivedJob.java index cdd388503a..5f689d0526 100644 --- a/src/org/thoughtcrime/securesms/jobs/PushReceivedJob.java +++ b/src/org/thoughtcrime/securesms/jobs/PushReceivedJob.java @@ -17,46 +17,9 @@ public abstract class PushReceivedJob extends BaseJob { private static final String TAG = PushReceivedJob.class.getSimpleName(); - public static final Object RECEIVE_LOCK = new Object(); protected PushReceivedJob(Job.Parameters parameters) { super(parameters); } - public void processEnvelope(@NonNull SignalServiceEnvelope envelope) { - synchronized (RECEIVE_LOCK) { - if (envelope.hasSource()) { - Address source = Address.fromExternal(context, envelope.getSource()); - Recipient recipient = Recipient.from(context, source, false); - - if (!isActiveNumber(recipient)) { - DatabaseFactory.getRecipientDatabase(context).setRegistered(recipient, RecipientDatabase.RegisteredState.REGISTERED); - ApplicationContext.getInstance(context).getJobManager().add(new DirectoryRefreshJob(recipient, false)); - } - } - - if (envelope.isReceipt()) { - handleReceipt(envelope); - } else if (envelope.isPreKeySignalMessage() || envelope.isSignalMessage() || envelope.isUnidentifiedSender()) { - handleMessage(envelope); - } else { - Log.w(TAG, "Received envelope of unknown type: " + envelope.getType()); - } - } - } - - private void handleMessage(SignalServiceEnvelope envelope) { - new PushDecryptJob(context).processMessage(envelope); - } - - @SuppressLint("DefaultLocale") - private void handleReceipt(SignalServiceEnvelope envelope) { - Log.i(TAG, String.format("Received receipt: (XXXXX, %d)", envelope.getTimestamp())); - DatabaseFactory.getMmsSmsDatabase(context).incrementDeliveryReceiptCount(new SyncMessageId(Address.fromExternal(context, envelope.getSource()), - envelope.getTimestamp()), System.currentTimeMillis()); - } - - private boolean isActiveNumber(@NonNull Recipient recipient) { - return recipient.resolve().getRegistered() == RecipientDatabase.RegisteredState.REGISTERED; - } } diff --git a/src/org/thoughtcrime/securesms/service/IncomingMessageObserver.java b/src/org/thoughtcrime/securesms/service/IncomingMessageObserver.java index 402e7190e9..af3e707a73 100644 --- a/src/org/thoughtcrime/securesms/service/IncomingMessageObserver.java +++ b/src/org/thoughtcrime/securesms/service/IncomingMessageObserver.java @@ -12,6 +12,7 @@ import androidx.annotation.Nullable; import androidx.core.app.NotificationCompat; import androidx.core.content.ContextCompat; +import org.thoughtcrime.securesms.IncomingMessageProcessor.Processor; import org.thoughtcrime.securesms.dependencies.ApplicationDependencies; import org.thoughtcrime.securesms.jobmanager.ConstraintObserver; import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint; @@ -20,7 +21,6 @@ import org.thoughtcrime.securesms.logging.Log; import org.thoughtcrime.securesms.ApplicationContext; import org.thoughtcrime.securesms.R; -import org.thoughtcrime.securesms.jobs.PushContentReceiveJob; import org.thoughtcrime.securesms.notifications.NotificationChannels; import org.thoughtcrime.securesms.push.SignalServiceNetworkAccess; import org.thoughtcrime.securesms.util.TextSecurePreferences; @@ -157,7 +157,9 @@ public class IncomingMessageObserver implements ConstraintObserver.Notifier { localPipe.read(REQUEST_TIMEOUT_MINUTES, TimeUnit.MINUTES, envelope -> { Log.i(TAG, "Retrieved envelope! " + String.valueOf(envelope.getSource())); - new PushContentReceiveJob(context).processEnvelope(envelope); + try (Processor processor = ApplicationDependencies.getIncomingMessageProcessor().acquire()) { + processor.processEnvelope(envelope); + } }); } catch (TimeoutException e) { Log.w(TAG, "Application level read timeout...");