Switch back to storing incoming messages in PushDatabase.

On a Pixel 3, this adds ~30-40ms of delay, but it's going to be a
requirement for upcoming migrations.
This commit is contained in:
Greyson Parrelli 2019-07-31 12:53:19 -04:00
parent fe1aa016b9
commit d3bed549f2
11 changed files with 211 additions and 128 deletions

View File

@ -0,0 +1,118 @@
package org.thoughtcrime.securesms;
import android.content.Context;
import androidx.annotation.NonNull;
import org.thoughtcrime.securesms.database.Address;
import org.thoughtcrime.securesms.database.DatabaseFactory;
import org.thoughtcrime.securesms.database.MessagingDatabase.SyncMessageId;
import org.thoughtcrime.securesms.database.MmsSmsDatabase;
import org.thoughtcrime.securesms.database.PushDatabase;
import org.thoughtcrime.securesms.database.RecipientDatabase;
import org.thoughtcrime.securesms.jobmanager.JobManager;
import org.thoughtcrime.securesms.jobs.DirectoryRefreshJob;
import org.thoughtcrime.securesms.jobs.PushDecryptJob;
import org.thoughtcrime.securesms.logging.Log;
import org.thoughtcrime.securesms.recipients.Recipient;
import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope;
import java.io.Closeable;
import java.util.Locale;
import java.util.concurrent.locks.ReentrantLock;
/**
* The central entry point for all envelopes that have been retrieved. Envelopes must be processed
* here to guarantee proper ordering.
*/
public class IncomingMessageProcessor {
private static final String TAG = Log.tag(IncomingMessageProcessor.class);
private final Context context;
private final ReentrantLock lock;
public IncomingMessageProcessor(@NonNull Context context) {
this.context = context;
this.lock = new ReentrantLock();
}
/**
* @return An instance of a Processor that will allow you to process messages in a thread safe
* way. Must be closed.
*/
public Processor acquire() {
lock.lock();
Thread current = Thread.currentThread();
Log.d(TAG, "Lock acquired by thread " + current.getId() + " (" + current.getName() + ")");
return new Processor(context);
}
private void release() {
Thread current = Thread.currentThread();
Log.d(TAG, "Lock about to be released by thread " + current.getId() + " (" + current.getName() + ")");
lock.unlock();
}
public class Processor implements Closeable {
private final Context context;
private final RecipientDatabase recipientDatabase;
private final PushDatabase pushDatabase;
private final MmsSmsDatabase mmsSmsDatabase;
private final JobManager jobManager;
private Processor(@NonNull Context context) {
this.context = context;
this.recipientDatabase = DatabaseFactory.getRecipientDatabase(context);
this.pushDatabase = DatabaseFactory.getPushDatabase(context);
this.mmsSmsDatabase = DatabaseFactory.getMmsSmsDatabase(context);
this.jobManager = ApplicationContext.getInstance(context).getJobManager();
}
public void processEnvelope(@NonNull SignalServiceEnvelope envelope) {
if (envelope.hasSource()) {
Address source = Address.fromExternal(context, envelope.getSource());
Recipient recipient = Recipient.from(context, source, false);
if (!isActiveNumber(recipient)) {
recipientDatabase.setRegistered(recipient, RecipientDatabase.RegisteredState.REGISTERED);
jobManager.add(new DirectoryRefreshJob(recipient, false));
}
}
if (envelope.isReceipt()) {
processReceipt(envelope);
} else if (envelope.isPreKeySignalMessage() || envelope.isSignalMessage() || envelope.isUnidentifiedSender()) {
processMessage(envelope);
} else {
Log.w(TAG, "Received envelope of unknown type: " + envelope.getType());
}
}
private void processMessage(@NonNull SignalServiceEnvelope envelope) {
Log.i(TAG, "Received message. Inserting in PushDatabase.");
long id = pushDatabase.insert(envelope);
jobManager.add(new PushDecryptJob(context, id));
}
private void processReceipt(@NonNull SignalServiceEnvelope envelope) {
Log.i(TAG, String.format(Locale.ENGLISH, "Received receipt: (XXXXX, %d)", envelope.getTimestamp()));
mmsSmsDatabase.incrementDeliveryReceiptCount(new SyncMessageId(Address.fromExternal(context, envelope.getSource()),
envelope.getTimestamp()),
System.currentTimeMillis());
}
private boolean isActiveNumber(@NonNull Recipient recipient) {
return recipient.resolve().getRegistered() == RecipientDatabase.RegisteredState.REGISTERED;
}
@Override
public void close() {
release();
}
}
}

View File

@ -4,6 +4,7 @@ import android.app.Application;
import androidx.annotation.NonNull;
import org.thoughtcrime.securesms.IncomingMessageProcessor;
import org.thoughtcrime.securesms.push.SignalServiceNetworkAccess;
import org.whispersystems.signalservice.api.SignalServiceAccountManager;
import org.whispersystems.signalservice.api.SignalServiceMessageReceiver;
@ -51,6 +52,11 @@ public class ApplicationDependencies {
return instance.provider.getSignalServiceNetworkAccess();
}
public static synchronized @NonNull IncomingMessageProcessor getIncomingMessageProcessor() {
assertInitialization();
return instance.provider.getIncomingMessageProcessor();
}
private static void assertInitialization() {
if (instance == null) {
throw new UninitializedException();
@ -62,6 +68,7 @@ public class ApplicationDependencies {
@NonNull SignalServiceMessageSender getSignalServiceMessageSender();
@NonNull SignalServiceMessageReceiver getSignalServiceMessageReceiver();
@NonNull SignalServiceNetworkAccess getSignalServiceNetworkAccess();
@NonNull IncomingMessageProcessor getIncomingMessageProcessor();
}
private static class UninitializedException extends IllegalStateException {

View File

@ -6,6 +6,7 @@ import androidx.annotation.NonNull;
import org.greenrobot.eventbus.EventBus;
import org.thoughtcrime.securesms.BuildConfig;
import org.thoughtcrime.securesms.IncomingMessageProcessor;
import org.thoughtcrime.securesms.crypto.storage.SignalProtocolStoreImpl;
import org.thoughtcrime.securesms.events.ReminderUpdateEvent;
import org.thoughtcrime.securesms.logging.Log;
@ -36,6 +37,7 @@ public class ApplicationDependencyProvider implements ApplicationDependencies.Pr
private SignalServiceAccountManager accountManager;
private SignalServiceMessageSender messageSender;
private SignalServiceMessageReceiver messageReceiver;
private IncomingMessageProcessor incomingMessageProcessor;
public ApplicationDependencyProvider(@NonNull Context context, @NonNull SignalServiceNetworkAccess networkAccess) {
this.context = context.getApplicationContext();
@ -93,6 +95,15 @@ public class ApplicationDependencyProvider implements ApplicationDependencies.Pr
return networkAccess;
}
@Override
public @NonNull IncomingMessageProcessor getIncomingMessageProcessor() {
if (incomingMessageProcessor == null) {
incomingMessageProcessor = new IncomingMessageProcessor(context);
}
return incomingMessageProcessor;
}
private static class DynamicCredentialsProvider implements CredentialsProvider {
private final Context context;

View File

@ -9,6 +9,7 @@ import org.thoughtcrime.securesms.jobs.AvatarDownloadJob;
import org.thoughtcrime.securesms.jobs.CleanPreKeysJob;
import org.thoughtcrime.securesms.jobs.CreateSignedPreKeyJob;
import org.thoughtcrime.securesms.jobs.DirectoryRefreshJob;
import org.thoughtcrime.securesms.jobs.FailingJob;
import org.thoughtcrime.securesms.jobs.FcmRefreshJob;
import org.thoughtcrime.securesms.jobs.LocalBackupJob;
import org.thoughtcrime.securesms.jobs.MmsDownloadJob;
@ -21,7 +22,6 @@ import org.thoughtcrime.securesms.jobs.MultiDeviceGroupUpdateJob;
import org.thoughtcrime.securesms.jobs.MultiDeviceProfileKeyUpdateJob;
import org.thoughtcrime.securesms.jobs.MultiDeviceReadUpdateJob;
import org.thoughtcrime.securesms.jobs.MultiDeviceVerifiedUpdateJob;
import org.thoughtcrime.securesms.jobs.PushContentReceiveJob;
import org.thoughtcrime.securesms.jobs.PushDecryptJob;
import org.thoughtcrime.securesms.jobs.PushGroupSendJob;
import org.thoughtcrime.securesms.jobs.PushGroupUpdateJob;
@ -71,7 +71,7 @@ public class WorkManagerFactoryMappings {
put(MultiDeviceProfileKeyUpdateJob.class.getName(), MultiDeviceProfileKeyUpdateJob.KEY);
put(MultiDeviceReadUpdateJob.class.getName(), MultiDeviceReadUpdateJob.KEY);
put(MultiDeviceVerifiedUpdateJob.class.getName(), MultiDeviceVerifiedUpdateJob.KEY);
put(PushContentReceiveJob.class.getName(), PushContentReceiveJob.KEY);
put("PushContentReceiveJob", FailingJob.KEY);
put(PushDecryptJob.class.getName(), PushDecryptJob.KEY);
put(PushGroupSendJob.class.getName(), PushGroupSendJob.KEY);
put(PushGroupUpdateJob.class.getName(), PushGroupUpdateJob.KEY);

View File

@ -0,0 +1,46 @@
package org.thoughtcrime.securesms.jobs;
import androidx.annotation.NonNull;
import org.thoughtcrime.securesms.jobmanager.Data;
import org.thoughtcrime.securesms.jobmanager.Job;
/**
* A job that always fails. Not useful on it's own, but you can register it's factory for jobs that
* have been removed that you'd like to fail instead of keeping around.
*/
public final class FailingJob extends Job {
public static final String KEY = "FailingJob";
private FailingJob(@NonNull Parameters parameters) {
super(parameters);
}
@Override
public @NonNull Data serialize() {
return Data.EMPTY;
}
@NonNull
@Override
public String getFactoryKey() {
return KEY;
}
@Override
public @NonNull Result run() {
return Result.failure();
}
@Override
public void onCanceled() {
}
public static final class Factory implements Job.Factory<FailingJob> {
@Override
public @NonNull FailingJob create(@NonNull Parameters parameters, @NonNull Data data) {
return new FailingJob(parameters);
}
}
}

View File

@ -45,7 +45,6 @@ public final class JobManagerFactories {
put(MultiDeviceStickerPackSyncJob.KEY, new MultiDeviceStickerPackSyncJob.Factory());
put(MultiDeviceVerifiedUpdateJob.KEY, new MultiDeviceVerifiedUpdateJob.Factory());
put(MultiDeviceViewOnceOpenJob.KEY, new MultiDeviceViewOnceOpenJob.Factory());
put(PushContentReceiveJob.KEY, new PushContentReceiveJob.Factory());
put(PushDecryptJob.KEY, new PushDecryptJob.Factory());
put(PushGroupSendJob.KEY, new PushGroupSendJob.Factory());
put(PushGroupUpdateJob.KEY, new PushGroupUpdateJob.Factory());
@ -72,6 +71,9 @@ public final class JobManagerFactories {
put(TrimThreadJob.KEY, new TrimThreadJob.Factory());
put(TypingSendJob.KEY, new TypingSendJob.Factory());
put(UpdateApkJob.KEY, new UpdateApkJob.Factory());
// Dead jobs
put("PushContentReceiveJob", new FailingJob.Factory());
}};
}

View File

@ -1,49 +0,0 @@
package org.thoughtcrime.securesms.jobs;
import android.content.Context;
import androidx.annotation.NonNull;
import org.thoughtcrime.securesms.jobmanager.Data;
import org.thoughtcrime.securesms.jobmanager.Job;
public class PushContentReceiveJob extends PushReceivedJob {
public static final String KEY = "PushContentReceiveJob";
public PushContentReceiveJob(Context context) {
this(new Job.Parameters.Builder().build());
setContext(context);
}
private PushContentReceiveJob(@NonNull Job.Parameters parameters) {
super(parameters);
}
@Override
public @NonNull Data serialize() {
return Data.EMPTY;
}
@Override
public @NonNull String getFactoryKey() {
return KEY;
}
@Override
public void onRun() { }
@Override
public void onCanceled() { }
@Override
public boolean onShouldRetry(@NonNull Exception exception) {
return false;
}
public static final class Factory implements Job.Factory<PushContentReceiveJob> {
@Override
public @NonNull PushContentReceiveJob create(@NonNull Parameters parameters, @NonNull Data data) {
return new PushContentReceiveJob(parameters);
}
}
}

View File

@ -28,6 +28,7 @@ import org.signal.libsignal.metadata.ProtocolUntrustedIdentityException;
import org.signal.libsignal.metadata.SelfSendException;
import org.thoughtcrime.securesms.ApplicationContext;
import org.thoughtcrime.securesms.ConversationListActivity;
import org.thoughtcrime.securesms.IncomingMessageProcessor;
import org.thoughtcrime.securesms.R;
import org.thoughtcrime.securesms.attachments.Attachment;
import org.thoughtcrime.securesms.attachments.DatabaseAttachment;
@ -88,6 +89,7 @@ import org.thoughtcrime.securesms.sms.OutgoingEncryptedMessage;
import org.thoughtcrime.securesms.sms.OutgoingEndSessionMessage;
import org.thoughtcrime.securesms.sms.OutgoingTextMessage;
import org.thoughtcrime.securesms.stickers.StickerLocator;
import org.thoughtcrime.securesms.transport.RetryLaterException;
import org.thoughtcrime.securesms.util.GroupUtil;
import org.thoughtcrime.securesms.util.Hex;
import org.thoughtcrime.securesms.util.IdentityUtil;
@ -141,10 +143,6 @@ public class PushDecryptJob extends BaseJob {
private long messageId;
private long smsMessageId;
public PushDecryptJob(Context context) {
this(context, -1);
}
public PushDecryptJob(Context context, long pushMessageId) {
this(context, pushMessageId, -1);
}
@ -152,7 +150,7 @@ public class PushDecryptJob extends BaseJob {
public PushDecryptJob(Context context, long pushMessageId, long smsMessageId) {
this(new Job.Parameters.Builder()
.setQueue("__PUSH_DECRYPT_JOB__")
.setMaxAttempts(10)
.setMaxAttempts(Parameters.UNLIMITED)
.build(),
pushMessageId,
smsMessageId);
@ -179,12 +177,11 @@ public class PushDecryptJob extends BaseJob {
}
@Override
public void onRun() throws NoSuchMessageException {
synchronized (PushReceivedJob.RECEIVE_LOCK) {
public void onRun() throws NoSuchMessageException, RetryLaterException {
if (needsMigration()) {
Log.w(TAG, "Skipping, waiting for migration...");
Log.w(TAG, "Migration is still needed.");
postMigrationNotification();
return;
throw new RetryLaterException();
}
PushDatabase database = DatabaseFactory.getPushDatabase(context);
@ -194,29 +191,14 @@ public class PushDecryptJob extends BaseJob {
handleMessage(envelope, optionalSmsMessageId);
database.delete(messageId);
}
}
@Override
public boolean onShouldRetry(@NonNull Exception exception) {
return false;
return exception instanceof RetryLaterException;
}
@Override
public void onCanceled() {
}
public void processMessage(@NonNull SignalServiceEnvelope envelope) {
synchronized (PushReceivedJob.RECEIVE_LOCK) {
if (needsMigration()) {
Log.w(TAG, "Skipping and storing envelope, waiting for migration...");
DatabaseFactory.getPushDatabase(context).insert(envelope);
postMigrationNotification();
return;
}
handleMessage(envelope, Optional.absent());
}
}
private boolean needsMigration() {

View File

@ -3,6 +3,7 @@ package org.thoughtcrime.securesms.jobs;
import android.content.Context;
import androidx.annotation.NonNull;
import org.thoughtcrime.securesms.IncomingMessageProcessor.Processor;
import org.thoughtcrime.securesms.dependencies.ApplicationDependencies;
import org.thoughtcrime.securesms.jobmanager.Data;
import org.thoughtcrime.securesms.jobmanager.Job;
@ -15,7 +16,7 @@ import org.whispersystems.signalservice.api.push.exceptions.PushNetworkException
import java.io.IOException;
public class PushNotificationReceiveJob extends PushReceivedJob {
public class PushNotificationReceiveJob extends BaseJob {
public static final String KEY = "PushNotificationReceiveJob";
@ -51,10 +52,10 @@ public class PushNotificationReceiveJob extends PushReceivedJob {
}
public void pullAndProcessMessages(SignalServiceMessageReceiver receiver, String tag, long startTime) throws IOException {
synchronized (PushReceivedJob.RECEIVE_LOCK) {
try (Processor processor = ApplicationDependencies.getIncomingMessageProcessor().acquire()) {
receiver.retrieveMessages(envelope -> {
Log.i(tag, "Retrieved an envelope." + timeSuffix(startTime));
processEnvelope(envelope);
processor.processEnvelope(envelope);
Log.i(tag, "Successfully processed an envelope." + timeSuffix(startTime));
});
TextSecurePreferences.setNeedsMessagePull(context, false);

View File

@ -17,46 +17,9 @@ public abstract class PushReceivedJob extends BaseJob {
private static final String TAG = PushReceivedJob.class.getSimpleName();
public static final Object RECEIVE_LOCK = new Object();
protected PushReceivedJob(Job.Parameters parameters) {
super(parameters);
}
public void processEnvelope(@NonNull SignalServiceEnvelope envelope) {
synchronized (RECEIVE_LOCK) {
if (envelope.hasSource()) {
Address source = Address.fromExternal(context, envelope.getSource());
Recipient recipient = Recipient.from(context, source, false);
if (!isActiveNumber(recipient)) {
DatabaseFactory.getRecipientDatabase(context).setRegistered(recipient, RecipientDatabase.RegisteredState.REGISTERED);
ApplicationContext.getInstance(context).getJobManager().add(new DirectoryRefreshJob(recipient, false));
}
}
if (envelope.isReceipt()) {
handleReceipt(envelope);
} else if (envelope.isPreKeySignalMessage() || envelope.isSignalMessage() || envelope.isUnidentifiedSender()) {
handleMessage(envelope);
} else {
Log.w(TAG, "Received envelope of unknown type: " + envelope.getType());
}
}
}
private void handleMessage(SignalServiceEnvelope envelope) {
new PushDecryptJob(context).processMessage(envelope);
}
@SuppressLint("DefaultLocale")
private void handleReceipt(SignalServiceEnvelope envelope) {
Log.i(TAG, String.format("Received receipt: (XXXXX, %d)", envelope.getTimestamp()));
DatabaseFactory.getMmsSmsDatabase(context).incrementDeliveryReceiptCount(new SyncMessageId(Address.fromExternal(context, envelope.getSource()),
envelope.getTimestamp()), System.currentTimeMillis());
}
private boolean isActiveNumber(@NonNull Recipient recipient) {
return recipient.resolve().getRegistered() == RecipientDatabase.RegisteredState.REGISTERED;
}
}

View File

@ -12,6 +12,7 @@ import androidx.annotation.Nullable;
import androidx.core.app.NotificationCompat;
import androidx.core.content.ContextCompat;
import org.thoughtcrime.securesms.IncomingMessageProcessor.Processor;
import org.thoughtcrime.securesms.dependencies.ApplicationDependencies;
import org.thoughtcrime.securesms.jobmanager.ConstraintObserver;
import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint;
@ -20,7 +21,6 @@ import org.thoughtcrime.securesms.logging.Log;
import org.thoughtcrime.securesms.ApplicationContext;
import org.thoughtcrime.securesms.R;
import org.thoughtcrime.securesms.jobs.PushContentReceiveJob;
import org.thoughtcrime.securesms.notifications.NotificationChannels;
import org.thoughtcrime.securesms.push.SignalServiceNetworkAccess;
import org.thoughtcrime.securesms.util.TextSecurePreferences;
@ -157,7 +157,9 @@ public class IncomingMessageObserver implements ConstraintObserver.Notifier {
localPipe.read(REQUEST_TIMEOUT_MINUTES, TimeUnit.MINUTES,
envelope -> {
Log.i(TAG, "Retrieved envelope! " + String.valueOf(envelope.getSource()));
new PushContentReceiveJob(context).processEnvelope(envelope);
try (Processor processor = ApplicationDependencies.getIncomingMessageProcessor().acquire()) {
processor.processEnvelope(envelope);
}
});
} catch (TimeoutException e) {
Log.w(TAG, "Application level read timeout...");