Add ability to listen to jobs based on a filter.

This commit is contained in:
Greyson Parrelli 2020-03-06 16:56:01 -05:00
parent 033bf77cbb
commit 4d0dbbc6cd
4 changed files with 100 additions and 52 deletions

View File

@ -5,6 +5,7 @@ import androidx.annotation.WorkerThread;
import org.thoughtcrime.securesms.IncomingMessageProcessor; import org.thoughtcrime.securesms.IncomingMessageProcessor;
import org.thoughtcrime.securesms.dependencies.ApplicationDependencies; import org.thoughtcrime.securesms.dependencies.ApplicationDependencies;
import org.thoughtcrime.securesms.jobmanager.Job;
import org.thoughtcrime.securesms.jobmanager.JobManager; import org.thoughtcrime.securesms.jobmanager.JobManager;
import org.thoughtcrime.securesms.jobmanager.JobTracker; import org.thoughtcrime.securesms.jobmanager.JobTracker;
import org.thoughtcrime.securesms.jobs.MarkerJob; import org.thoughtcrime.securesms.jobs.MarkerJob;
@ -75,7 +76,7 @@ public class RestStrategy implements MessageRetriever.Strategy {
jobManager.addListener(markerJob.getId(), new JobTracker.JobListener() { jobManager.addListener(markerJob.getId(), new JobTracker.JobListener() {
@Override @Override
public void onStateChanged(@NonNull JobTracker.JobState jobState) { public void onStateChanged(@NonNull Job job, @NonNull JobTracker.JobState jobState) {
if (jobState.isComplete()) { if (jobState.isComplete()) {
jobManager.removeListener(this); jobManager.removeListener(this);
latch.countDown(); latch.countDown();

View File

@ -85,7 +85,7 @@ class JobController {
if (chainExceedsMaximumInstances(chain)) { if (chainExceedsMaximumInstances(chain)) {
Job solo = chain.get(0).get(0); Job solo = chain.get(0).get(0);
jobTracker.onStateChange(solo.getId(), JobTracker.JobState.IGNORED); jobTracker.onStateChange(solo, JobTracker.JobState.IGNORED);
Log.w(TAG, JobLogger.format(solo, "Already at the max instance count of " + solo.getParameters().getMaxInstances() + ". Skipping.")); Log.w(TAG, JobLogger.format(solo, "Already at the max instance count of " + solo.getParameters().getMaxInstances() + ". Skipping."));
return; return;
} }
@ -101,7 +101,7 @@ class JobController {
List<List<Job>> chain = Collections.singletonList(Collections.singletonList(job)); List<List<Job>> chain = Collections.singletonList(Collections.singletonList(job));
if (chainExceedsMaximumInstances(chain)) { if (chainExceedsMaximumInstances(chain)) {
jobTracker.onStateChange(job.getId(), JobTracker.JobState.IGNORED); jobTracker.onStateChange(job, JobTracker.JobState.IGNORED);
Log.w(TAG, JobLogger.format(job, "Already at the max instance count of " + job.getParameters().getMaxInstances() + ". Skipping.")); Log.w(TAG, JobLogger.format(job, "Already at the max instance count of " + job.getParameters().getMaxInstances() + ". Skipping."));
return; return;
} }
@ -149,7 +149,7 @@ class JobController {
String serializedData = dataSerializer.serialize(job.serialize()); String serializedData = dataSerializer.serialize(job.serialize());
jobStorage.updateJobAfterRetry(job.getId(), false, nextRunAttempt, nextRunAttemptTime, serializedData); jobStorage.updateJobAfterRetry(job.getId(), false, nextRunAttempt, nextRunAttemptTime, serializedData);
jobTracker.onStateChange(job.getId(), JobTracker.JobState.PENDING); jobTracker.onStateChange(job, JobTracker.JobState.PENDING);
List<Constraint> constraints = Stream.of(jobStorage.getConstraintSpecs(job.getId())) List<Constraint> constraints = Stream.of(jobStorage.getConstraintSpecs(job.getId()))
.map(ConstraintSpec::getFactoryKey) .map(ConstraintSpec::getFactoryKey)
@ -172,7 +172,7 @@ class JobController {
@WorkerThread @WorkerThread
synchronized void onSuccess(@NonNull Job job) { synchronized void onSuccess(@NonNull Job job) {
jobStorage.deleteJob(job.getId()); jobStorage.deleteJob(job.getId());
jobTracker.onStateChange(job.getId(), JobTracker.JobState.SUCCESS); jobTracker.onStateChange(job, JobTracker.JobState.SUCCESS);
notifyAll(); notifyAll();
} }
@ -196,7 +196,7 @@ class JobController {
all.addAll(dependents); all.addAll(dependents);
jobStorage.deleteJobs(Stream.of(all).map(Job::getId).toList()); jobStorage.deleteJobs(Stream.of(all).map(Job::getId).toList());
Stream.of(all).forEach(j -> jobTracker.onStateChange(j.getId(), JobTracker.JobState.FAILURE)); Stream.of(all).forEach(j -> jobTracker.onStateChange(j, JobTracker.JobState.FAILURE));
return dependents; return dependents;
} }
@ -224,7 +224,7 @@ class JobController {
jobStorage.updateJobRunningState(job.getId(), true); jobStorage.updateJobRunningState(job.getId(), true);
runningJobs.put(job.getId(), job); runningJobs.put(job.getId(), job);
jobTracker.onStateChange(job.getId(), JobTracker.JobState.RUNNING); jobTracker.onStateChange(job, JobTracker.JobState.RUNNING);
return job; return job;
} catch (InterruptedException e) { } catch (InterruptedException e) {

View File

@ -100,13 +100,21 @@ public class JobManager implements ConstraintObserver.Notifier {
}); });
} }
/**
* Convenience method for {@link #addListener(JobTracker.JobFilter, JobTracker.JobListener)} that
* takes in an ID to filter on.
*/
public void addListener(@NonNull String id, @NonNull JobTracker.JobListener listener) {
jobTracker.addListener(new JobIdFilter(id), listener);
}
/** /**
* Add a listener to subscribe to job state updates. Listeners will be invoked on an arbitrary * Add a listener to subscribe to job state updates. Listeners will be invoked on an arbitrary
* background thread. You must eventually call {@link #removeListener(JobTracker.JobListener)} to avoid * background thread. You must eventually call {@link #removeListener(JobTracker.JobListener)} to avoid
* memory leaks. * memory leaks.
*/ */
public void addListener(@NonNull String id, @NonNull JobTracker.JobListener listener) { public void addListener(@NonNull JobTracker.JobFilter filter, @NonNull JobTracker.JobListener listener) {
jobTracker.addListener(id, listener); jobTracker.addListener(filter, listener);
} }
/** /**
@ -127,7 +135,7 @@ public class JobManager implements ConstraintObserver.Notifier {
* Enqueues a single job that depends on a collection of job ID's. * Enqueues a single job that depends on a collection of job ID's.
*/ */
public void add(@NonNull Job job, @NonNull Collection<String> dependsOn) { public void add(@NonNull Job job, @NonNull Collection<String> dependsOn) {
jobTracker.onStateChange(job.getId(), JobTracker.JobState.PENDING); jobTracker.onStateChange(job, JobTracker.JobState.PENDING);
executor.execute(() -> { executor.execute(() -> {
jobController.submitJobWithExistingDependencies(job, dependsOn); jobController.submitJobWithExistingDependencies(job, dependsOn);
@ -177,7 +185,7 @@ public class JobManager implements ConstraintObserver.Notifier {
addListener(job.getId(), new JobTracker.JobListener() { addListener(job.getId(), new JobTracker.JobListener() {
@Override @Override
public void onStateChanged(@NonNull JobTracker.JobState jobState) { public void onStateChanged(@NonNull Job job, @NonNull JobTracker.JobState jobState) {
if (jobState.isComplete()) { if (jobState.isComplete()) {
removeListener(this); removeListener(this);
resultState.set(jobState); resultState.set(jobState);
@ -248,7 +256,7 @@ public class JobManager implements ConstraintObserver.Notifier {
private void enqueueChain(@NonNull Chain chain) { private void enqueueChain(@NonNull Chain chain) {
for (List<Job> jobList : chain.getJobListChain()) { for (List<Job> jobList : chain.getJobListChain()) {
for (Job job : jobList) { for (Job job : jobList) {
jobTracker.onStateChange(job.getId(), JobTracker.JobState.PENDING); jobTracker.onStateChange(job, JobTracker.JobState.PENDING);
} }
} }
@ -270,6 +278,19 @@ public class JobManager implements ConstraintObserver.Notifier {
void onQueueEmpty(); void onQueueEmpty();
} }
public static class JobIdFilter implements JobTracker.JobFilter {
private final String id;
public JobIdFilter(@NonNull String id) {
this.id = id;
}
@Override
public boolean matches(@NonNull Job job) {
return id.equals(job.getId());
}
}
/** /**
* Allows enqueuing work that depends on each other. Jobs that appear later in the chain will * Allows enqueuing work that depends on each other. Jobs that appear later in the chain will
* only run after all jobs earlier in the chain have been completed. If a job fails, all jobs * only run after all jobs earlier in the chain have been completed. If a job fails, all jobs

View File

@ -3,13 +3,15 @@ package org.thoughtcrime.securesms.jobmanager;
import androidx.annotation.NonNull; import androidx.annotation.NonNull;
import androidx.annotation.Nullable; import androidx.annotation.Nullable;
import com.annimon.stream.Stream;
import org.thoughtcrime.securesms.util.LRUCache; import org.thoughtcrime.securesms.util.LRUCache;
import org.thoughtcrime.securesms.util.concurrent.SignalExecutors; import org.thoughtcrime.securesms.util.concurrent.SignalExecutors;
import java.util.Collection; import java.util.ArrayList;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
/** /**
@ -17,11 +19,13 @@ import java.util.concurrent.Executor;
*/ */
public class JobTracker { public class JobTracker {
private final Map<String, TrackingState> trackingStates; private final Map<String, JobInfo> jobInfos;
private final List<ListenerInfo> jobListeners;
private final Executor listenerExecutor; private final Executor listenerExecutor;
JobTracker() { JobTracker() {
this.trackingStates = new LRUCache<>(1000); this.jobInfos = new LRUCache<>(1000);
this.jobListeners = new ArrayList<>();
this.listenerExecutor = SignalExecutors.BOUNDED; this.listenerExecutor = SignalExecutors.BOUNDED;
} }
@ -30,54 +34,63 @@ public class JobTracker {
* background thread. You must eventually call {@link #removeListener(JobListener)} to avoid * background thread. You must eventually call {@link #removeListener(JobListener)} to avoid
* memory leaks. * memory leaks.
*/ */
synchronized void addListener(@NonNull String id, @NonNull JobListener jobListener) { synchronized void addListener(@NonNull JobFilter filter, @NonNull JobListener listener) {
TrackingState state = getOrCreateTrackingState(id); jobListeners.add(new ListenerInfo(filter, listener));
JobState currentJobState = state.getJobState();
state.addListener(jobListener); Stream.of(jobInfos.values())
.filter(info -> info.getJobState() != null)
if (currentJobState != null) { .filter(info -> filter.matches(info.getJob()))
listenerExecutor.execute(() -> jobListener.onStateChanged(currentJobState)); .forEach(state-> {
} //noinspection ConstantConditions We already filter for nulls above
listenerExecutor.execute(() -> listener.onStateChanged(state.getJob(), state.getJobState()));
});
} }
/** /**
* Unsubscribe the provided listener from all job updates. * Unsubscribe the provided listener from all job updates.
*/ */
synchronized void removeListener(@NonNull JobListener jobListener) { synchronized void removeListener(@NonNull JobListener listener) {
Collection<TrackingState> allTrackingState = trackingStates.values(); Iterator<ListenerInfo> iter = jobListeners.iterator();
for (TrackingState state : allTrackingState) { while (iter.hasNext()) {
state.removeListener(jobListener); if (listener.equals(iter.next().getListener())) {
iter.remove();
}
} }
} }
/** /**
* Update the state of a job with the associated ID. * Update the state of a job with the associated ID.
*/ */
synchronized void onStateChange(@NonNull String id, @NonNull JobState jobState) { synchronized void onStateChange(@NonNull Job job, @NonNull JobState state) {
TrackingState trackingState = getOrCreateTrackingState(id); getOrCreateJobInfo(job).setJobState(state);
trackingState.setJobState(jobState);
for (JobListener listener : trackingState.getListeners()) { Stream.of(jobListeners)
listenerExecutor.execute(() -> listener.onStateChanged(jobState)); .filter(info -> info.getFilter().matches(job))
} .map(ListenerInfo::getListener)
.forEach(listener -> {
listenerExecutor.execute(() -> listener.onStateChanged(job, state));
});
} }
private @NonNull TrackingState getOrCreateTrackingState(@NonNull String id) { private @NonNull JobInfo getOrCreateJobInfo(@NonNull Job job) {
TrackingState state = trackingStates.get(id); JobInfo jobInfo = jobInfos.get(job.getId());
if (state == null) { if (jobInfo == null) {
state = new TrackingState(); jobInfo = new JobInfo(job);
} }
trackingStates.put(id, state); jobInfos.put(job.getId(), jobInfo);
return state; return jobInfo;
}
public interface JobFilter {
boolean matches(@NonNull Job job);
} }
public interface JobListener { public interface JobListener {
void onStateChanged(@NonNull JobState jobState); void onStateChanged(@NonNull Job job, @NonNull JobState jobState);
} }
public enum JobState { public enum JobState {
@ -94,21 +107,34 @@ public class JobTracker {
} }
} }
private static class TrackingState { private static class ListenerInfo {
private final JobFilter filter;
private final JobListener listener;
private ListenerInfo(JobFilter filter, JobListener listener) {
this.filter = filter;
this.listener = listener;
}
@NonNull JobFilter getFilter() {
return filter;
}
@NonNull JobListener getListener() {
return listener;
}
}
private static class JobInfo {
private final Job job;
private JobState jobState; private JobState jobState;
private final CopyOnWriteArraySet<JobListener> listeners = new CopyOnWriteArraySet<>(); private JobInfo(Job job) {
this.job = job;
void addListener(@NonNull JobListener jobListener) {
listeners.add(jobListener);
} }
void removeListener(@NonNull JobListener jobListener) { @NonNull Job getJob() {
listeners.remove(jobListener); return job;
}
@NonNull Collection<JobListener> getListeners() {
return listeners;
} }
void setJobState(@NonNull JobState jobState) { void setJobState(@NonNull JobState jobState) {