From ccb18cd46ccd13f9755e0851bec910467af9f8c2 Mon Sep 17 00:00:00 2001 From: Greyson Parrelli Date: Tue, 15 Oct 2019 16:46:29 -0400 Subject: [PATCH] Added JobTracker. --- .../securesms/jobmanager/Job.java | 47 ++++--- .../securesms/jobmanager/JobController.java | 14 +- .../securesms/jobmanager/JobManager.java | 40 +++++- .../securesms/jobmanager/JobTracker.java | 122 ++++++++++++++++++ 4 files changed, 197 insertions(+), 26 deletions(-) create mode 100644 src/org/thoughtcrime/securesms/jobmanager/JobTracker.java diff --git a/src/org/thoughtcrime/securesms/jobmanager/Job.java b/src/org/thoughtcrime/securesms/jobmanager/Job.java index 45665440a2..fd74667283 100644 --- a/src/org/thoughtcrime/securesms/jobmanager/Job.java +++ b/src/org/thoughtcrime/securesms/jobmanager/Job.java @@ -10,6 +10,7 @@ import org.thoughtcrime.securesms.logging.Log; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.UUID; import java.util.concurrent.TimeUnit; /** @@ -30,7 +31,6 @@ public abstract class Job { private final Parameters parameters; - private String id; private int runAttempt; private long nextRunAttemptTime; @@ -41,7 +41,7 @@ public abstract class Job { } public final String getId() { - return id; + return parameters.getId(); } public final @NonNull Parameters getParameters() { @@ -64,11 +64,6 @@ public abstract class Job { this.context = context; } - /** Should only be invoked by {@link JobController} */ - final void setId(@NonNull String id) { - this.id = id; - } - /** Should only be invoked by {@link JobController} */ final void setRunAttempt(int runAttempt) { this.runAttempt = runAttempt; @@ -203,6 +198,7 @@ public abstract class Job { public static final int IMMORTAL = -1; public static final int UNLIMITED = -1; + private final String id; private final long createTime; private final long lifespan; private final int maxAttempts; @@ -211,7 +207,8 @@ public abstract class Job { private final String queue; private final List constraintKeys; - private Parameters(long createTime, + private Parameters(@NonNull String id, + long createTime, long lifespan, int maxAttempts, long maxBackoff, @@ -219,6 +216,7 @@ public abstract class Job { @Nullable String queue, @NonNull List constraintKeys) { + this.id = id; this.createTime = createTime; this.lifespan = lifespan; this.maxAttempts = maxAttempts; @@ -228,41 +226,46 @@ public abstract class Job { this.constraintKeys = constraintKeys; } - public long getCreateTime() { + @NonNull String getId() { + return id; + } + + long getCreateTime() { return createTime; } - public long getLifespan() { + long getLifespan() { return lifespan; } - public int getMaxAttempts() { + int getMaxAttempts() { return maxAttempts; } - public long getMaxBackoff() { + long getMaxBackoff() { return maxBackoff; } - public int getMaxInstances() { + int getMaxInstances() { return maxInstances; } - public @Nullable String getQueue() { + @Nullable String getQueue() { return queue; } - public List getConstraintKeys() { + List getConstraintKeys() { return constraintKeys; } public Builder toBuilder() { - return new Builder(createTime, maxBackoff, lifespan, maxAttempts, maxInstances, queue, constraintKeys); + return new Builder(id, createTime, maxBackoff, lifespan, maxAttempts, maxInstances, queue, constraintKeys); } public static final class Builder { + private String id; private long createTime; private long maxBackoff; private long lifespan; @@ -272,10 +275,15 @@ public abstract class Job { private List constraintKeys; public Builder() { - this(System.currentTimeMillis(), TimeUnit.SECONDS.toMillis(30), IMMORTAL, 1, UNLIMITED, null, new LinkedList<>()); + this(UUID.randomUUID().toString()); } - private Builder(long createTime, + Builder(@NonNull String id) { + this(id, System.currentTimeMillis(), TimeUnit.SECONDS.toMillis(30), IMMORTAL, 1, UNLIMITED, null, new LinkedList<>()); + } + + private Builder(@NonNull String id, + long createTime, long maxBackoff, long lifespan, int maxAttempts, @@ -283,6 +291,7 @@ public abstract class Job { @Nullable String queue, @NonNull List constraintKeys) { + this.id = id; this.createTime = createTime; this.maxBackoff = maxBackoff; this.lifespan = lifespan; @@ -368,7 +377,7 @@ public abstract class Job { } public @NonNull Parameters build() { - return new Parameters(createTime, lifespan, maxAttempts, maxBackoff, maxInstances, queue, constraintKeys); + return new Parameters(id, createTime, lifespan, maxAttempts, maxBackoff, maxInstances, queue, constraintKeys); } } } diff --git a/src/org/thoughtcrime/securesms/jobmanager/JobController.java b/src/org/thoughtcrime/securesms/jobmanager/JobController.java index 722a27fe65..9c8e4e0f6a 100644 --- a/src/org/thoughtcrime/securesms/jobmanager/JobController.java +++ b/src/org/thoughtcrime/securesms/jobmanager/JobController.java @@ -36,6 +36,7 @@ class JobController { private final JobInstantiator jobInstantiator; private final ConstraintInstantiator constraintInstantiator; private final Data.Serializer dataSerializer; + private final JobTracker jobTracker; private final Scheduler scheduler; private final Debouncer debouncer; private final Callback callback; @@ -46,6 +47,7 @@ class JobController { @NonNull JobInstantiator jobInstantiator, @NonNull ConstraintInstantiator constraintInstantiator, @NonNull Data.Serializer dataSerializer, + @NonNull JobTracker jobTracker, @NonNull Scheduler scheduler, @NonNull Debouncer debouncer, @NonNull Callback callback) @@ -55,6 +57,7 @@ class JobController { this.jobInstantiator = jobInstantiator; this.constraintInstantiator = constraintInstantiator; this.dataSerializer = dataSerializer; + this.jobTracker = jobTracker; this.scheduler = scheduler; this.debouncer = debouncer; this.callback = callback; @@ -82,6 +85,7 @@ class JobController { if (chainExceedsMaximumInstances(chain)) { Job solo = chain.get(0).get(0); + jobTracker.onStateChange(solo.getId(), JobTracker.JobState.IGNORED); Log.w(TAG, JobLogger.format(solo, "Already at the max instance count of " + solo.getParameters().getMaxInstances() + ". Skipping.")); return; } @@ -98,6 +102,7 @@ class JobController { long nextRunAttemptTime = calculateNextRunAttemptTime(System.currentTimeMillis(), nextRunAttempt, job.getParameters().getMaxBackoff()); jobStorage.updateJobAfterRetry(job.getId(), false, nextRunAttempt, nextRunAttemptTime); + jobTracker.onStateChange(job.getId(), JobTracker.JobState.PENDING); List constraints = Stream.of(jobStorage.getConstraintSpecs(job.getId())) .map(ConstraintSpec::getFactoryKey) @@ -120,6 +125,7 @@ class JobController { @WorkerThread synchronized void onSuccess(@NonNull Job job) { jobStorage.deleteJob(job.getId()); + jobTracker.onStateChange(job.getId(), JobTracker.JobState.SUCCESS); notifyAll(); } @@ -143,6 +149,7 @@ class JobController { all.addAll(dependents); jobStorage.deleteJobs(Stream.of(all).map(Job::getId).toList()); + Stream.of(all).forEach(j -> jobTracker.onStateChange(j.getId(), JobTracker.JobState.FAILURE)); return dependents; } @@ -170,6 +177,7 @@ class JobController { jobStorage.updateJobRunningState(job.getId(), true); runningJobs.add(job.getId()); + jobTracker.onStateChange(job.getId(), JobTracker.JobState.RUNNING); return job; } catch (InterruptedException e) { @@ -253,9 +261,6 @@ class JobController { @WorkerThread private @NonNull FullSpec buildFullSpec(@NonNull Job job, @NonNull List dependsOn) { - String id = UUID.randomUUID().toString(); - - job.setId(id); job.setRunAttempt(0); JobSpec jobSpec = new JobSpec(job.getId(), @@ -319,7 +324,6 @@ class JobController { Data data = dataSerializer.deserialize(jobSpec.getSerializedData()); Job job = jobInstantiator.instantiate(jobSpec.getFactoryKey(), parameters, data); - job.setId(jobSpec.getId()); job.setRunAttempt(jobSpec.getRunAttempt()); job.setNextRunAttemptTime(jobSpec.getNextRunAttemptTime()); job.setContext(application); @@ -328,7 +332,7 @@ class JobController { } private @NonNull Job.Parameters buildJobParameters(@NonNull JobSpec jobSpec, @NonNull List constraintSpecs) { - return new Job.Parameters.Builder() + return new Job.Parameters.Builder(jobSpec.getId()) .setCreateTime(jobSpec.getCreateTime()) .setLifespan(jobSpec.getLifespan()) .setMaxAttempts(jobSpec.getMaxAttempts()) diff --git a/src/org/thoughtcrime/securesms/jobmanager/JobManager.java b/src/org/thoughtcrime/securesms/jobmanager/JobManager.java index e8cd81b0f1..52124ac42c 100644 --- a/src/org/thoughtcrime/securesms/jobmanager/JobManager.java +++ b/src/org/thoughtcrime/securesms/jobmanager/JobManager.java @@ -5,6 +5,7 @@ import android.content.Intent; import android.os.Build; import androidx.annotation.NonNull; import androidx.annotation.WorkerThread; +import androidx.lifecycle.LiveData; import org.thoughtcrime.securesms.jobmanager.impl.DefaultExecutorFactory; import org.thoughtcrime.securesms.jobmanager.impl.JsonDataSerializer; @@ -39,6 +40,7 @@ public class JobManager implements ConstraintObserver.Notifier { private final Configuration configuration; private final ExecutorService executor; private final JobController jobController; + private final JobTracker jobTracker; private final Set emptyQueueListeners = new CopyOnWriteArraySet<>(); @@ -46,11 +48,13 @@ public class JobManager implements ConstraintObserver.Notifier { this.application = application; this.configuration = configuration; this.executor = configuration.getExecutorFactory().newSingleThreadExecutor("signal-JobManager"); + this.jobTracker = configuration.getJobTracker(); this.jobController = new JobController(application, configuration.getJobStorage(), configuration.getJobInstantiator(), configuration.getConstraintFactories(), configuration.getDataSerializer(), + configuration.getJobTracker(), Build.VERSION.SDK_INT < 26 ? new AlarmManagerScheduler(application) : new CompositeScheduler(new InAppScheduler(this), new JobSchedulerScheduler(application)), new Debouncer(500), @@ -92,6 +96,23 @@ public class JobManager implements ConstraintObserver.Notifier { }); } + /** + * Add a listener to subscribe to job state updates. Listeners will be invoked on an arbitrary + * background thread. You must eventually call {@link #removeListener(JobTracker.JobListener)} to avoid + * memory leaks. + */ + public void addListener(@NonNull String id, @NonNull JobTracker.JobListener listener) { + jobTracker.addListener(id, listener); + } + + /** + * Unsubscribe the provided listener from all job updates. + */ + public void removeListener(@NonNull JobTracker.JobListener listener) { + jobTracker.removeListener(listener); + } + + /** * Enqueues a single job to be run. */ @@ -160,6 +181,12 @@ public class JobManager implements ConstraintObserver.Notifier { } private void enqueueChain(@NonNull Chain chain) { + for (List jobList : chain.getJobListChain()) { + for (Job job : jobList) { + jobTracker.onStateChange(job.getId(), JobTracker.JobState.PENDING); + } + } + executor.execute(() -> { jobController.submitNewJobChain(chain.getJobListChain()); wakeUp(); @@ -225,6 +252,7 @@ public class JobManager implements ConstraintObserver.Notifier { private final Data.Serializer dataSerializer; private final JobStorage jobStorage; private final JobMigrator jobMigrator; + private final JobTracker jobTracker; private Configuration(int jobThreadCount, @NonNull ExecutorFactory executorFactory, @@ -233,7 +261,8 @@ public class JobManager implements ConstraintObserver.Notifier { @NonNull List constraintObservers, @NonNull Data.Serializer dataSerializer, @NonNull JobStorage jobStorage, - @NonNull JobMigrator jobMigrator) + @NonNull JobMigrator jobMigrator, + @NonNull JobTracker jobTracker) { this.executorFactory = executorFactory; this.jobThreadCount = jobThreadCount; @@ -243,6 +272,7 @@ public class JobManager implements ConstraintObserver.Notifier { this.dataSerializer = dataSerializer; this.jobStorage = jobStorage; this.jobMigrator = jobMigrator; + this.jobTracker = jobTracker; } int getJobThreadCount() { @@ -278,6 +308,10 @@ public class JobManager implements ConstraintObserver.Notifier { return jobMigrator; } + @NonNull JobTracker getJobTracker() { + return jobTracker; + } + public static class Builder { private ExecutorFactory executorFactory = new DefaultExecutorFactory(); @@ -288,6 +322,7 @@ public class JobManager implements ConstraintObserver.Notifier { private Data.Serializer dataSerializer = new JsonDataSerializer(); private JobStorage jobStorage = null; private JobMigrator jobMigrator = null; + private JobTracker jobTracker = new JobTracker(); public @NonNull Builder setJobThreadCount(int jobThreadCount) { this.jobThreadCount = jobThreadCount; @@ -337,7 +372,8 @@ public class JobManager implements ConstraintObserver.Notifier { new ArrayList<>(constraintObservers), dataSerializer, jobStorage, - jobMigrator); + jobMigrator, + jobTracker); } } } diff --git a/src/org/thoughtcrime/securesms/jobmanager/JobTracker.java b/src/org/thoughtcrime/securesms/jobmanager/JobTracker.java new file mode 100644 index 0000000000..1b51c9bb56 --- /dev/null +++ b/src/org/thoughtcrime/securesms/jobmanager/JobTracker.java @@ -0,0 +1,122 @@ +package org.thoughtcrime.securesms.jobmanager; + +import androidx.annotation.NonNull; +import androidx.annotation.Nullable; + +import org.thoughtcrime.securesms.util.LRUCache; +import org.thoughtcrime.securesms.util.concurrent.SignalExecutors; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.Executor; + +/** + * Tracks the state of {@link Job}s and allows callers to listen to changes. + */ +public class JobTracker { + + private final Map trackingStates; + private final Executor listenerExecutor; + + JobTracker() { + this.trackingStates = new LRUCache<>(1000); + this.listenerExecutor = SignalExecutors.BOUNDED; + } + + /** + * Add a listener to subscribe to job state updates. Listeners will be invoked on an arbitrary + * background thread. You must eventually call {@link #removeListener(JobListener)} to avoid + * memory leaks. + */ + synchronized void addListener(@NonNull String id, @NonNull JobListener jobListener) { + TrackingState state = getOrCreateTrackingState(id); + JobState currentJobState = state.getJobState(); + + state.addListener(jobListener); + + if (currentJobState != null) { + listenerExecutor.execute(() -> jobListener.onStateChanged(currentJobState)); + } + } + + /** + * Unsubscribe the provided listener from all job updates. + */ + synchronized void removeListener(@NonNull JobListener jobListener) { + Collection allTrackingState = trackingStates.values(); + + for (TrackingState state : allTrackingState) { + state.removeListener(jobListener); + } + } + + /** + * Update the state of a job with the associated ID. + */ + synchronized void onStateChange(@NonNull String id, @NonNull JobState jobState) { + TrackingState trackingState = getOrCreateTrackingState(id); + trackingState.setJobState(jobState); + + for (JobListener listener : trackingState.getListeners()) { + listenerExecutor.execute(() -> listener.onStateChanged(jobState)); + } + } + + private @NonNull TrackingState getOrCreateTrackingState(@NonNull String id) { + TrackingState state = trackingStates.get(id); + + if (state == null) { + state = new TrackingState(); + } + + trackingStates.put(id, state); + + return state; + } + + public interface JobListener { + void onStateChanged(@NonNull JobState jobState); + } + + public enum JobState { + PENDING(false), RUNNING(false), SUCCESS(true), FAILURE(true), IGNORED(true); + + private final boolean complete; + + JobState(boolean complete) { + this.complete = complete; + } + + public boolean isComplete() { + return complete; + } + } + + private static class TrackingState { + private JobState jobState; + + private final CopyOnWriteArraySet listeners = new CopyOnWriteArraySet<>(); + + void addListener(@NonNull JobListener jobListener) { + listeners.add(jobListener); + } + + void removeListener(@NonNull JobListener jobListener) { + listeners.remove(jobListener); + } + + @NonNull Collection getListeners() { + return listeners; + } + + void setJobState(@NonNull JobState jobState) { + this.jobState = jobState; + } + + @Nullable JobState getJobState() { + return jobState; + } + } +}