Add reserved job runners for inbound and outbound messages.

This commit is contained in:
Greyson Parrelli 2020-06-25 08:48:23 -07:00
parent 63d6ab6fa7
commit 152578e576
6 changed files with 84 additions and 10 deletions

View File

@ -7,6 +7,15 @@ import androidx.annotation.NonNull;
import org.greenrobot.eventbus.EventBus; import org.greenrobot.eventbus.EventBus;
import org.thoughtcrime.securesms.BuildConfig; import org.thoughtcrime.securesms.BuildConfig;
import org.thoughtcrime.securesms.jobmanager.impl.FactoryJobPredicate;
import org.thoughtcrime.securesms.jobs.MarkerJob;
import org.thoughtcrime.securesms.jobs.PushDecryptMessageJob;
import org.thoughtcrime.securesms.jobs.PushGroupSendJob;
import org.thoughtcrime.securesms.jobs.PushMediaSendJob;
import org.thoughtcrime.securesms.jobs.PushProcessMessageJob;
import org.thoughtcrime.securesms.jobs.PushTextSendJob;
import org.thoughtcrime.securesms.jobs.ReactionSendJob;
import org.thoughtcrime.securesms.jobs.TypingSendJob;
import org.thoughtcrime.securesms.messages.IncomingMessageProcessor; import org.thoughtcrime.securesms.messages.IncomingMessageProcessor;
import org.thoughtcrime.securesms.crypto.storage.SignalProtocolStoreImpl; import org.thoughtcrime.securesms.crypto.storage.SignalProtocolStoreImpl;
import org.thoughtcrime.securesms.database.DatabaseFactory; import org.thoughtcrime.securesms.database.DatabaseFactory;
@ -136,6 +145,8 @@ public class ApplicationDependencyProvider implements ApplicationDependencies.Pr
.setConstraintObservers(JobManagerFactories.getConstraintObservers(context)) .setConstraintObservers(JobManagerFactories.getConstraintObservers(context))
.setJobStorage(new FastJobStorage(DatabaseFactory.getJobDatabase(context), SignalExecutors.newCachedSingleThreadExecutor("signal-fast-job-storage"))) .setJobStorage(new FastJobStorage(DatabaseFactory.getJobDatabase(context), SignalExecutors.newCachedSingleThreadExecutor("signal-fast-job-storage")))
.setJobMigrator(new JobMigrator(TextSecurePreferences.getJobManagerVersion(context), JobManager.CURRENT_VERSION, JobManagerFactories.getJobMigrations(context))) .setJobMigrator(new JobMigrator(TextSecurePreferences.getJobManagerVersion(context), JobManager.CURRENT_VERSION, JobManagerFactories.getJobMigrations(context)))
.addReservedJobRunner(new FactoryJobPredicate(PushDecryptMessageJob.KEY, PushProcessMessageJob.KEY, MarkerJob.KEY))
.addReservedJobRunner(new FactoryJobPredicate(PushTextSendJob.KEY, PushMediaSendJob.KEY, PushGroupSendJob.KEY, ReactionSendJob.KEY, TypingSendJob.KEY))
.build()); .build());
} }

View File

@ -242,11 +242,11 @@ class JobController {
* When the job returned from this method has been run, you must call {@link #onJobFinished(Job)}. * When the job returned from this method has been run, you must call {@link #onJobFinished(Job)}.
*/ */
@WorkerThread @WorkerThread
synchronized @NonNull Job pullNextEligibleJobForExecution() { synchronized @NonNull Job pullNextEligibleJobForExecution(@NonNull JobPredicate predicate) {
try { try {
Job job; Job job;
while ((job = getNextEligibleJobForExecution()) == null) { while ((job = getNextEligibleJobForExecution(predicate)) == null) {
if (runningJobs.isEmpty()) { if (runningJobs.isEmpty()) {
debouncer.publish(callback::onEmpty); debouncer.publish(callback::onEmpty);
} }
@ -387,8 +387,10 @@ class JobController {
} }
@WorkerThread @WorkerThread
private @Nullable Job getNextEligibleJobForExecution() { private @Nullable Job getNextEligibleJobForExecution(@NonNull JobPredicate predicate) {
List<JobSpec> jobSpecs = jobStorage.getPendingJobsWithNoDependenciesInCreatedOrder(System.currentTimeMillis()); List<JobSpec> jobSpecs = Stream.of(jobStorage.getPendingJobsWithNoDependenciesInCreatedOrder(System.currentTimeMillis()))
.filter(predicate::shouldRun)
.toList();
for (JobSpec jobSpec : jobSpecs) { for (JobSpec jobSpec : jobSpecs) {
List<ConstraintSpec> constraintSpecs = jobStorage.getConstraintSpecs(jobSpec.getId()); List<ConstraintSpec> constraintSpecs = jobStorage.getConstraintSpecs(jobSpec.getId());

View File

@ -104,9 +104,16 @@ public class JobManager implements ConstraintObserver.Notifier {
*/ */
public void beginJobLoop() { public void beginJobLoop() {
runOnExecutor(()-> { runOnExecutor(()-> {
int id = 0;
for (int i = 0; i < configuration.getJobThreadCount(); i++) { for (int i = 0; i < configuration.getJobThreadCount(); i++) {
new JobRunner(application, i + 1, jobController).start(); new JobRunner(application, ++id, jobController, JobPredicate.NONE).start();
} }
for (JobPredicate predicate : configuration.getReservedJobRunners()) {
new JobRunner(application, ++id, jobController, predicate).start();
}
jobController.wakeUp(); jobController.wakeUp();
}); });
} }
@ -445,6 +452,7 @@ public class JobManager implements ConstraintObserver.Notifier {
private final JobStorage jobStorage; private final JobStorage jobStorage;
private final JobMigrator jobMigrator; private final JobMigrator jobMigrator;
private final JobTracker jobTracker; private final JobTracker jobTracker;
private final List<JobPredicate> reservedJobRunners;
private Configuration(int jobThreadCount, private Configuration(int jobThreadCount,
@NonNull ExecutorFactory executorFactory, @NonNull ExecutorFactory executorFactory,
@ -454,17 +462,19 @@ public class JobManager implements ConstraintObserver.Notifier {
@NonNull Data.Serializer dataSerializer, @NonNull Data.Serializer dataSerializer,
@NonNull JobStorage jobStorage, @NonNull JobStorage jobStorage,
@NonNull JobMigrator jobMigrator, @NonNull JobMigrator jobMigrator,
@NonNull JobTracker jobTracker) @NonNull JobTracker jobTracker,
@NonNull List<JobPredicate> reservedJobRunners)
{ {
this.executorFactory = executorFactory; this.executorFactory = executorFactory;
this.jobThreadCount = jobThreadCount; this.jobThreadCount = jobThreadCount;
this.jobInstantiator = jobInstantiator; this.jobInstantiator = jobInstantiator;
this.constraintInstantiator = constraintInstantiator; this.constraintInstantiator = constraintInstantiator;
this.constraintObservers = constraintObservers; this.constraintObservers = new ArrayList<>(constraintObservers);
this.dataSerializer = dataSerializer; this.dataSerializer = dataSerializer;
this.jobStorage = jobStorage; this.jobStorage = jobStorage;
this.jobMigrator = jobMigrator; this.jobMigrator = jobMigrator;
this.jobTracker = jobTracker; this.jobTracker = jobTracker;
this.reservedJobRunners = new ArrayList<>(reservedJobRunners);
} }
int getJobThreadCount() { int getJobThreadCount() {
@ -504,6 +514,10 @@ public class JobManager implements ConstraintObserver.Notifier {
return jobTracker; return jobTracker;
} }
@NonNull List<JobPredicate> getReservedJobRunners() {
return reservedJobRunners;
}
public static class Builder { public static class Builder {
private ExecutorFactory executorFactory = new DefaultExecutorFactory(); private ExecutorFactory executorFactory = new DefaultExecutorFactory();
@ -515,12 +529,18 @@ public class JobManager implements ConstraintObserver.Notifier {
private JobStorage jobStorage = null; private JobStorage jobStorage = null;
private JobMigrator jobMigrator = null; private JobMigrator jobMigrator = null;
private JobTracker jobTracker = new JobTracker(); private JobTracker jobTracker = new JobTracker();
private List<JobPredicate> reservedJobRunners = new ArrayList<>();
public @NonNull Builder setJobThreadCount(int jobThreadCount) { public @NonNull Builder setJobThreadCount(int jobThreadCount) {
this.jobThreadCount = jobThreadCount; this.jobThreadCount = jobThreadCount;
return this; return this;
} }
public @NonNull Builder addReservedJobRunner(@NonNull JobPredicate predicate) {
this.reservedJobRunners.add(predicate);
return this;
}
public @NonNull Builder setExecutorFactory(@NonNull ExecutorFactory executorFactory) { public @NonNull Builder setExecutorFactory(@NonNull ExecutorFactory executorFactory) {
this.executorFactory = executorFactory; this.executorFactory = executorFactory;
return this; return this;
@ -565,7 +585,8 @@ public class JobManager implements ConstraintObserver.Notifier {
dataSerializer, dataSerializer,
jobStorage, jobStorage,
jobMigrator, jobMigrator,
jobTracker); jobTracker,
reservedJobRunners);
} }
} }
} }

View File

@ -0,0 +1,11 @@
package org.thoughtcrime.securesms.jobmanager;
import androidx.annotation.NonNull;
import org.thoughtcrime.securesms.jobmanager.persistence.JobSpec;
public interface JobPredicate {
JobPredicate NONE = jobSpec -> true;
boolean shouldRun(@NonNull JobSpec jobSpec);
}

View File

@ -29,20 +29,22 @@ class JobRunner extends Thread {
private final Application application; private final Application application;
private final int id; private final int id;
private final JobController jobController; private final JobController jobController;
private final JobPredicate jobPredicate;
JobRunner(@NonNull Application application, int id, @NonNull JobController jobController) { JobRunner(@NonNull Application application, int id, @NonNull JobController jobController, @NonNull JobPredicate predicate) {
super("signal-JobRunner-" + id); super("signal-JobRunner-" + id);
this.application = application; this.application = application;
this.id = id; this.id = id;
this.jobController = jobController; this.jobController = jobController;
this.jobPredicate = predicate;
} }
@Override @Override
public synchronized void run() { public synchronized void run() {
//noinspection InfiniteLoopStatement //noinspection InfiniteLoopStatement
while (true) { while (true) {
Job job = jobController.pullNextEligibleJobForExecution(); Job job = jobController.pullNextEligibleJobForExecution(jobPredicate);
Job.Result result = run(job); Job.Result result = run(job);
jobController.onJobFinished(job); jobController.onJobFinished(job);

View File

@ -0,0 +1,27 @@
package org.thoughtcrime.securesms.jobmanager.impl;
import androidx.annotation.NonNull;
import org.thoughtcrime.securesms.jobmanager.JobPredicate;
import org.thoughtcrime.securesms.jobmanager.persistence.JobSpec;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
/**
* A {@link JobPredicate} that will only run jobs with the provided factory keys.
*/
public final class FactoryJobPredicate implements JobPredicate {
private final Set<String> factories;
public FactoryJobPredicate(String... factories) {
this.factories = new HashSet<>(Arrays.asList(factories));
}
@Override
public boolean shouldRun(@NonNull JobSpec jobSpec) {
return factories.contains(jobSpec.getFactoryKey());
}
}