diff --git a/app/src/main/java/org/thoughtcrime/securesms/dependencies/ApplicationDependencyProvider.java b/app/src/main/java/org/thoughtcrime/securesms/dependencies/ApplicationDependencyProvider.java index 43329406c8..7fab281fa1 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/dependencies/ApplicationDependencyProvider.java +++ b/app/src/main/java/org/thoughtcrime/securesms/dependencies/ApplicationDependencyProvider.java @@ -7,6 +7,15 @@ import androidx.annotation.NonNull; import org.greenrobot.eventbus.EventBus; import org.thoughtcrime.securesms.BuildConfig; +import org.thoughtcrime.securesms.jobmanager.impl.FactoryJobPredicate; +import org.thoughtcrime.securesms.jobs.MarkerJob; +import org.thoughtcrime.securesms.jobs.PushDecryptMessageJob; +import org.thoughtcrime.securesms.jobs.PushGroupSendJob; +import org.thoughtcrime.securesms.jobs.PushMediaSendJob; +import org.thoughtcrime.securesms.jobs.PushProcessMessageJob; +import org.thoughtcrime.securesms.jobs.PushTextSendJob; +import org.thoughtcrime.securesms.jobs.ReactionSendJob; +import org.thoughtcrime.securesms.jobs.TypingSendJob; import org.thoughtcrime.securesms.messages.IncomingMessageProcessor; import org.thoughtcrime.securesms.crypto.storage.SignalProtocolStoreImpl; import org.thoughtcrime.securesms.database.DatabaseFactory; @@ -136,6 +145,8 @@ public class ApplicationDependencyProvider implements ApplicationDependencies.Pr .setConstraintObservers(JobManagerFactories.getConstraintObservers(context)) .setJobStorage(new FastJobStorage(DatabaseFactory.getJobDatabase(context), SignalExecutors.newCachedSingleThreadExecutor("signal-fast-job-storage"))) .setJobMigrator(new JobMigrator(TextSecurePreferences.getJobManagerVersion(context), JobManager.CURRENT_VERSION, JobManagerFactories.getJobMigrations(context))) + .addReservedJobRunner(new FactoryJobPredicate(PushDecryptMessageJob.KEY, PushProcessMessageJob.KEY, MarkerJob.KEY)) + .addReservedJobRunner(new FactoryJobPredicate(PushTextSendJob.KEY, PushMediaSendJob.KEY, PushGroupSendJob.KEY, ReactionSendJob.KEY, TypingSendJob.KEY)) .build()); } diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobController.java b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobController.java index 0c41743d72..ee82e524cc 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobController.java +++ b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobController.java @@ -242,11 +242,11 @@ class JobController { * When the job returned from this method has been run, you must call {@link #onJobFinished(Job)}. */ @WorkerThread - synchronized @NonNull Job pullNextEligibleJobForExecution() { + synchronized @NonNull Job pullNextEligibleJobForExecution(@NonNull JobPredicate predicate) { try { Job job; - while ((job = getNextEligibleJobForExecution()) == null) { + while ((job = getNextEligibleJobForExecution(predicate)) == null) { if (runningJobs.isEmpty()) { debouncer.publish(callback::onEmpty); } @@ -387,8 +387,10 @@ class JobController { } @WorkerThread - private @Nullable Job getNextEligibleJobForExecution() { - List jobSpecs = jobStorage.getPendingJobsWithNoDependenciesInCreatedOrder(System.currentTimeMillis()); + private @Nullable Job getNextEligibleJobForExecution(@NonNull JobPredicate predicate) { + List jobSpecs = Stream.of(jobStorage.getPendingJobsWithNoDependenciesInCreatedOrder(System.currentTimeMillis())) + .filter(predicate::shouldRun) + .toList(); for (JobSpec jobSpec : jobSpecs) { List constraintSpecs = jobStorage.getConstraintSpecs(jobSpec.getId()); diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobManager.java b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobManager.java index d2c9137561..110ba9adc6 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobManager.java +++ b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobManager.java @@ -104,9 +104,16 @@ public class JobManager implements ConstraintObserver.Notifier { */ public void beginJobLoop() { runOnExecutor(()-> { + int id = 0; + for (int i = 0; i < configuration.getJobThreadCount(); i++) { - new JobRunner(application, i + 1, jobController).start(); + new JobRunner(application, ++id, jobController, JobPredicate.NONE).start(); } + + for (JobPredicate predicate : configuration.getReservedJobRunners()) { + new JobRunner(application, ++id, jobController, predicate).start(); + } + jobController.wakeUp(); }); } @@ -445,6 +452,7 @@ public class JobManager implements ConstraintObserver.Notifier { private final JobStorage jobStorage; private final JobMigrator jobMigrator; private final JobTracker jobTracker; + private final List reservedJobRunners; private Configuration(int jobThreadCount, @NonNull ExecutorFactory executorFactory, @@ -454,17 +462,19 @@ public class JobManager implements ConstraintObserver.Notifier { @NonNull Data.Serializer dataSerializer, @NonNull JobStorage jobStorage, @NonNull JobMigrator jobMigrator, - @NonNull JobTracker jobTracker) + @NonNull JobTracker jobTracker, + @NonNull List reservedJobRunners) { this.executorFactory = executorFactory; this.jobThreadCount = jobThreadCount; this.jobInstantiator = jobInstantiator; this.constraintInstantiator = constraintInstantiator; - this.constraintObservers = constraintObservers; + this.constraintObservers = new ArrayList<>(constraintObservers); this.dataSerializer = dataSerializer; this.jobStorage = jobStorage; this.jobMigrator = jobMigrator; this.jobTracker = jobTracker; + this.reservedJobRunners = new ArrayList<>(reservedJobRunners); } int getJobThreadCount() { @@ -504,6 +514,10 @@ public class JobManager implements ConstraintObserver.Notifier { return jobTracker; } + @NonNull List getReservedJobRunners() { + return reservedJobRunners; + } + public static class Builder { private ExecutorFactory executorFactory = new DefaultExecutorFactory(); @@ -515,12 +529,18 @@ public class JobManager implements ConstraintObserver.Notifier { private JobStorage jobStorage = null; private JobMigrator jobMigrator = null; private JobTracker jobTracker = new JobTracker(); + private List reservedJobRunners = new ArrayList<>(); public @NonNull Builder setJobThreadCount(int jobThreadCount) { this.jobThreadCount = jobThreadCount; return this; } + public @NonNull Builder addReservedJobRunner(@NonNull JobPredicate predicate) { + this.reservedJobRunners.add(predicate); + return this; + } + public @NonNull Builder setExecutorFactory(@NonNull ExecutorFactory executorFactory) { this.executorFactory = executorFactory; return this; @@ -565,7 +585,8 @@ public class JobManager implements ConstraintObserver.Notifier { dataSerializer, jobStorage, jobMigrator, - jobTracker); + jobTracker, + reservedJobRunners); } } } diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobPredicate.java b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobPredicate.java new file mode 100644 index 0000000000..5ef44a6c89 --- /dev/null +++ b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobPredicate.java @@ -0,0 +1,11 @@ +package org.thoughtcrime.securesms.jobmanager; + +import androidx.annotation.NonNull; + +import org.thoughtcrime.securesms.jobmanager.persistence.JobSpec; + +public interface JobPredicate { + JobPredicate NONE = jobSpec -> true; + + boolean shouldRun(@NonNull JobSpec jobSpec); +} diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobRunner.java b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobRunner.java index aa71ba4d76..56b674b20d 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobRunner.java +++ b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobRunner.java @@ -29,20 +29,22 @@ class JobRunner extends Thread { private final Application application; private final int id; private final JobController jobController; + private final JobPredicate jobPredicate; - JobRunner(@NonNull Application application, int id, @NonNull JobController jobController) { + JobRunner(@NonNull Application application, int id, @NonNull JobController jobController, @NonNull JobPredicate predicate) { super("signal-JobRunner-" + id); this.application = application; this.id = id; this.jobController = jobController; + this.jobPredicate = predicate; } @Override public synchronized void run() { //noinspection InfiniteLoopStatement while (true) { - Job job = jobController.pullNextEligibleJobForExecution(); + Job job = jobController.pullNextEligibleJobForExecution(jobPredicate); Job.Result result = run(job); jobController.onJobFinished(job); diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/impl/FactoryJobPredicate.java b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/impl/FactoryJobPredicate.java new file mode 100644 index 0000000000..5959a460ea --- /dev/null +++ b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/impl/FactoryJobPredicate.java @@ -0,0 +1,27 @@ +package org.thoughtcrime.securesms.jobmanager.impl; + +import androidx.annotation.NonNull; + +import org.thoughtcrime.securesms.jobmanager.JobPredicate; +import org.thoughtcrime.securesms.jobmanager.persistence.JobSpec; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + +/** + * A {@link JobPredicate} that will only run jobs with the provided factory keys. + */ +public final class FactoryJobPredicate implements JobPredicate { + + private final Set factories; + + public FactoryJobPredicate(String... factories) { + this.factories = new HashSet<>(Arrays.asList(factories)); + } + + @Override + public boolean shouldRun(@NonNull JobSpec jobSpec) { + return factories.contains(jobSpec.getFactoryKey()); + } +}