diff --git a/app/src/main/java/org/thoughtcrime/securesms/gcm/RestStrategy.java b/app/src/main/java/org/thoughtcrime/securesms/gcm/RestStrategy.java index 1df1ff899f..c3822975ba 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/gcm/RestStrategy.java +++ b/app/src/main/java/org/thoughtcrime/securesms/gcm/RestStrategy.java @@ -5,6 +5,7 @@ import androidx.annotation.WorkerThread; import org.thoughtcrime.securesms.IncomingMessageProcessor; import org.thoughtcrime.securesms.dependencies.ApplicationDependencies; +import org.thoughtcrime.securesms.jobmanager.Job; import org.thoughtcrime.securesms.jobmanager.JobManager; import org.thoughtcrime.securesms.jobmanager.JobTracker; import org.thoughtcrime.securesms.jobs.MarkerJob; @@ -75,7 +76,7 @@ public class RestStrategy implements MessageRetriever.Strategy { jobManager.addListener(markerJob.getId(), new JobTracker.JobListener() { @Override - public void onStateChanged(@NonNull JobTracker.JobState jobState) { + public void onStateChanged(@NonNull Job job, @NonNull JobTracker.JobState jobState) { if (jobState.isComplete()) { jobManager.removeListener(this); latch.countDown(); diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobController.java b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobController.java index 539677d254..340fad3af3 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobController.java +++ b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobController.java @@ -85,7 +85,7 @@ class JobController { if (chainExceedsMaximumInstances(chain)) { Job solo = chain.get(0).get(0); - jobTracker.onStateChange(solo.getId(), JobTracker.JobState.IGNORED); + jobTracker.onStateChange(solo, JobTracker.JobState.IGNORED); Log.w(TAG, JobLogger.format(solo, "Already at the max instance count of " + solo.getParameters().getMaxInstances() + ". Skipping.")); return; } @@ -101,7 +101,7 @@ class JobController { List> chain = Collections.singletonList(Collections.singletonList(job)); if (chainExceedsMaximumInstances(chain)) { - jobTracker.onStateChange(job.getId(), JobTracker.JobState.IGNORED); + jobTracker.onStateChange(job, JobTracker.JobState.IGNORED); Log.w(TAG, JobLogger.format(job, "Already at the max instance count of " + job.getParameters().getMaxInstances() + ". Skipping.")); return; } @@ -149,7 +149,7 @@ class JobController { String serializedData = dataSerializer.serialize(job.serialize()); jobStorage.updateJobAfterRetry(job.getId(), false, nextRunAttempt, nextRunAttemptTime, serializedData); - jobTracker.onStateChange(job.getId(), JobTracker.JobState.PENDING); + jobTracker.onStateChange(job, JobTracker.JobState.PENDING); List constraints = Stream.of(jobStorage.getConstraintSpecs(job.getId())) .map(ConstraintSpec::getFactoryKey) @@ -172,7 +172,7 @@ class JobController { @WorkerThread synchronized void onSuccess(@NonNull Job job) { jobStorage.deleteJob(job.getId()); - jobTracker.onStateChange(job.getId(), JobTracker.JobState.SUCCESS); + jobTracker.onStateChange(job, JobTracker.JobState.SUCCESS); notifyAll(); } @@ -196,7 +196,7 @@ class JobController { all.addAll(dependents); jobStorage.deleteJobs(Stream.of(all).map(Job::getId).toList()); - Stream.of(all).forEach(j -> jobTracker.onStateChange(j.getId(), JobTracker.JobState.FAILURE)); + Stream.of(all).forEach(j -> jobTracker.onStateChange(j, JobTracker.JobState.FAILURE)); return dependents; } @@ -224,7 +224,7 @@ class JobController { jobStorage.updateJobRunningState(job.getId(), true); runningJobs.put(job.getId(), job); - jobTracker.onStateChange(job.getId(), JobTracker.JobState.RUNNING); + jobTracker.onStateChange(job, JobTracker.JobState.RUNNING); return job; } catch (InterruptedException e) { diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobManager.java b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobManager.java index 47ef724243..c0cbec2083 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobManager.java +++ b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobManager.java @@ -100,13 +100,21 @@ public class JobManager implements ConstraintObserver.Notifier { }); } + /** + * Convenience method for {@link #addListener(JobTracker.JobFilter, JobTracker.JobListener)} that + * takes in an ID to filter on. + */ + public void addListener(@NonNull String id, @NonNull JobTracker.JobListener listener) { + jobTracker.addListener(new JobIdFilter(id), listener); + } + /** * Add a listener to subscribe to job state updates. Listeners will be invoked on an arbitrary * background thread. You must eventually call {@link #removeListener(JobTracker.JobListener)} to avoid * memory leaks. */ - public void addListener(@NonNull String id, @NonNull JobTracker.JobListener listener) { - jobTracker.addListener(id, listener); + public void addListener(@NonNull JobTracker.JobFilter filter, @NonNull JobTracker.JobListener listener) { + jobTracker.addListener(filter, listener); } /** @@ -127,7 +135,7 @@ public class JobManager implements ConstraintObserver.Notifier { * Enqueues a single job that depends on a collection of job ID's. */ public void add(@NonNull Job job, @NonNull Collection dependsOn) { - jobTracker.onStateChange(job.getId(), JobTracker.JobState.PENDING); + jobTracker.onStateChange(job, JobTracker.JobState.PENDING); executor.execute(() -> { jobController.submitJobWithExistingDependencies(job, dependsOn); @@ -177,7 +185,7 @@ public class JobManager implements ConstraintObserver.Notifier { addListener(job.getId(), new JobTracker.JobListener() { @Override - public void onStateChanged(@NonNull JobTracker.JobState jobState) { + public void onStateChanged(@NonNull Job job, @NonNull JobTracker.JobState jobState) { if (jobState.isComplete()) { removeListener(this); resultState.set(jobState); @@ -248,7 +256,7 @@ public class JobManager implements ConstraintObserver.Notifier { private void enqueueChain(@NonNull Chain chain) { for (List jobList : chain.getJobListChain()) { for (Job job : jobList) { - jobTracker.onStateChange(job.getId(), JobTracker.JobState.PENDING); + jobTracker.onStateChange(job, JobTracker.JobState.PENDING); } } @@ -270,6 +278,19 @@ public class JobManager implements ConstraintObserver.Notifier { void onQueueEmpty(); } + public static class JobIdFilter implements JobTracker.JobFilter { + private final String id; + + public JobIdFilter(@NonNull String id) { + this.id = id; + } + + @Override + public boolean matches(@NonNull Job job) { + return id.equals(job.getId()); + } + } + /** * Allows enqueuing work that depends on each other. Jobs that appear later in the chain will * only run after all jobs earlier in the chain have been completed. If a job fails, all jobs diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobTracker.java b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobTracker.java index 1b51c9bb56..d00509462d 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobTracker.java +++ b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobTracker.java @@ -3,13 +3,15 @@ package org.thoughtcrime.securesms.jobmanager; import androidx.annotation.NonNull; import androidx.annotation.Nullable; +import com.annimon.stream.Stream; + import org.thoughtcrime.securesms.util.LRUCache; import org.thoughtcrime.securesms.util.concurrent.SignalExecutors; -import java.util.Collection; +import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.Executor; /** @@ -17,11 +19,13 @@ import java.util.concurrent.Executor; */ public class JobTracker { - private final Map trackingStates; - private final Executor listenerExecutor; + private final Map jobInfos; + private final List jobListeners; + private final Executor listenerExecutor; JobTracker() { - this.trackingStates = new LRUCache<>(1000); + this.jobInfos = new LRUCache<>(1000); + this.jobListeners = new ArrayList<>(); this.listenerExecutor = SignalExecutors.BOUNDED; } @@ -30,54 +34,63 @@ public class JobTracker { * background thread. You must eventually call {@link #removeListener(JobListener)} to avoid * memory leaks. */ - synchronized void addListener(@NonNull String id, @NonNull JobListener jobListener) { - TrackingState state = getOrCreateTrackingState(id); - JobState currentJobState = state.getJobState(); + synchronized void addListener(@NonNull JobFilter filter, @NonNull JobListener listener) { + jobListeners.add(new ListenerInfo(filter, listener)); - state.addListener(jobListener); - - if (currentJobState != null) { - listenerExecutor.execute(() -> jobListener.onStateChanged(currentJobState)); - } + Stream.of(jobInfos.values()) + .filter(info -> info.getJobState() != null) + .filter(info -> filter.matches(info.getJob())) + .forEach(state-> { + //noinspection ConstantConditions We already filter for nulls above + listenerExecutor.execute(() -> listener.onStateChanged(state.getJob(), state.getJobState())); + }); } /** * Unsubscribe the provided listener from all job updates. */ - synchronized void removeListener(@NonNull JobListener jobListener) { - Collection allTrackingState = trackingStates.values(); + synchronized void removeListener(@NonNull JobListener listener) { + Iterator iter = jobListeners.iterator(); - for (TrackingState state : allTrackingState) { - state.removeListener(jobListener); + while (iter.hasNext()) { + if (listener.equals(iter.next().getListener())) { + iter.remove(); + } } } /** * Update the state of a job with the associated ID. */ - synchronized void onStateChange(@NonNull String id, @NonNull JobState jobState) { - TrackingState trackingState = getOrCreateTrackingState(id); - trackingState.setJobState(jobState); + synchronized void onStateChange(@NonNull Job job, @NonNull JobState state) { + getOrCreateJobInfo(job).setJobState(state); - for (JobListener listener : trackingState.getListeners()) { - listenerExecutor.execute(() -> listener.onStateChanged(jobState)); - } + Stream.of(jobListeners) + .filter(info -> info.getFilter().matches(job)) + .map(ListenerInfo::getListener) + .forEach(listener -> { + listenerExecutor.execute(() -> listener.onStateChanged(job, state)); + }); } - private @NonNull TrackingState getOrCreateTrackingState(@NonNull String id) { - TrackingState state = trackingStates.get(id); + private @NonNull JobInfo getOrCreateJobInfo(@NonNull Job job) { + JobInfo jobInfo = jobInfos.get(job.getId()); - if (state == null) { - state = new TrackingState(); + if (jobInfo == null) { + jobInfo = new JobInfo(job); } - trackingStates.put(id, state); + jobInfos.put(job.getId(), jobInfo); - return state; + return jobInfo; + } + + public interface JobFilter { + boolean matches(@NonNull Job job); } public interface JobListener { - void onStateChanged(@NonNull JobState jobState); + void onStateChanged(@NonNull Job job, @NonNull JobState jobState); } public enum JobState { @@ -94,21 +107,34 @@ public class JobTracker { } } - private static class TrackingState { - private JobState jobState; + private static class ListenerInfo { + private final JobFilter filter; + private final JobListener listener; - private final CopyOnWriteArraySet listeners = new CopyOnWriteArraySet<>(); - - void addListener(@NonNull JobListener jobListener) { - listeners.add(jobListener); + private ListenerInfo(JobFilter filter, JobListener listener) { + this.filter = filter; + this.listener = listener; } - void removeListener(@NonNull JobListener jobListener) { - listeners.remove(jobListener); + @NonNull JobFilter getFilter() { + return filter; } - @NonNull Collection getListeners() { - return listeners; + @NonNull JobListener getListener() { + return listener; + } + } + + private static class JobInfo { + private final Job job; + private JobState jobState; + + private JobInfo(Job job) { + this.job = job; + } + + @NonNull Job getJob() { + return job; } void setJobState(@NonNull JobState jobState) {