mirror of
https://github.com/oxen-io/session-android.git
synced 2025-10-21 15:28:40 +00:00
Migrated to new JobManager.
This commit is contained in:
@@ -1,138 +1,181 @@
|
||||
package org.thoughtcrime.securesms.jobmanager;
|
||||
|
||||
import android.content.Context;
|
||||
import android.app.Application;
|
||||
import android.content.Intent;
|
||||
import android.os.Build;
|
||||
import android.support.annotation.NonNull;
|
||||
|
||||
import com.annimon.stream.Stream;
|
||||
|
||||
import org.thoughtcrime.securesms.jobmanager.impl.DefaultExecutorFactory;
|
||||
import org.thoughtcrime.securesms.jobmanager.impl.JsonDataSerializer;
|
||||
import org.thoughtcrime.securesms.jobmanager.migration.WorkManagerMigrator;
|
||||
import org.thoughtcrime.securesms.jobmanager.persistence.JobStorage;
|
||||
import org.thoughtcrime.securesms.logging.Log;
|
||||
import org.thoughtcrime.securesms.util.Debouncer;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CopyOnWriteArraySet;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import androidx.work.BackoffPolicy;
|
||||
import androidx.work.Constraints;
|
||||
import androidx.work.Data;
|
||||
import androidx.work.ExistingWorkPolicy;
|
||||
import androidx.work.NetworkType;
|
||||
import androidx.work.OneTimeWorkRequest;
|
||||
import androidx.work.WorkContinuation;
|
||||
import androidx.work.WorkManager;
|
||||
|
||||
public class JobManager {
|
||||
/**
|
||||
* Allows the scheduling of durable jobs that will be run as early as possible.
|
||||
*/
|
||||
public class JobManager implements ConstraintObserver.Notifier {
|
||||
|
||||
private static final String TAG = JobManager.class.getSimpleName();
|
||||
|
||||
private static final Constraints NETWORK_CONSTRAINT = new Constraints.Builder()
|
||||
.setRequiredNetworkType(NetworkType.CONNECTED)
|
||||
.build();
|
||||
private final ExecutorService executor;
|
||||
private final JobController jobController;
|
||||
private final JobRunner[] jobRunners;
|
||||
|
||||
private final Executor executor = Executors.newSingleThreadExecutor();
|
||||
private final Set<EmptyQueueListener> emptyQueueListeners = new CopyOnWriteArraySet<>();
|
||||
|
||||
private final Context context;
|
||||
private final WorkManager workManager;
|
||||
public JobManager(@NonNull Application application, @NonNull Configuration configuration) {
|
||||
this.executor = configuration.getExecutorFactory().newSingleThreadExecutor("JobManager");
|
||||
this.jobRunners = new JobRunner[configuration.getJobThreadCount()];
|
||||
this.jobController = new JobController(application,
|
||||
configuration.getJobStorage(),
|
||||
configuration.getJobInstantiator(),
|
||||
configuration.getConstraintFactories(),
|
||||
configuration.getDataSerializer(),
|
||||
configuration.getDependencyInjector(),
|
||||
Build.VERSION.SDK_INT < 26 ? new AlarmManagerScheduler(application)
|
||||
: new CompositeScheduler(new InAppScheduler(this), new JobSchedulerScheduler(application)),
|
||||
new Debouncer(500),
|
||||
this::onEmptyQueue);
|
||||
|
||||
public JobManager(@NonNull Context context, @NonNull WorkManager workManager) {
|
||||
this.context = context;
|
||||
this.workManager = workManager;
|
||||
}
|
||||
|
||||
public Chain startChain(@NonNull Job job) {
|
||||
return startChain(Collections.singletonList(job));
|
||||
}
|
||||
|
||||
public Chain startChain(@NonNull List<? extends Job> jobs) {
|
||||
return new Chain(jobs);
|
||||
}
|
||||
|
||||
public void add(Job job) {
|
||||
JobParameters jobParameters = job.getJobParameters();
|
||||
|
||||
if (jobParameters == null) {
|
||||
throw new IllegalStateException("Jobs must have JobParameters at this stage. (" + job.getClass().getSimpleName() + ")");
|
||||
}
|
||||
|
||||
startChain(job).enqueue(jobParameters.getSoloChainParameters());
|
||||
}
|
||||
|
||||
private void enqueueChain(@NonNull Chain chain, @NonNull ChainParameters chainParameters) {
|
||||
executor.execute(() -> {
|
||||
try {
|
||||
workManager.pruneWork().getResult().get();
|
||||
} catch (ExecutionException | InterruptedException e) {
|
||||
Log.w(TAG, "Failed to prune work.", e);
|
||||
if (WorkManagerMigrator.needsMigration(application)) {
|
||||
Log.i(TAG, "Detected an old WorkManager database. Migrating.");
|
||||
WorkManagerMigrator.migrate(application, configuration.getJobStorage(), configuration.getDataSerializer());
|
||||
}
|
||||
|
||||
List<List<Job>> jobListChain = chain.getJobListChain();
|
||||
List<List<OneTimeWorkRequest>> requestListChain = Stream.of(jobListChain)
|
||||
.filter(jobList -> !jobList.isEmpty())
|
||||
.map(jobList -> Stream.of(jobList).map(this::toWorkRequest).toList())
|
||||
.toList();
|
||||
jobController.init();
|
||||
|
||||
if (jobListChain.isEmpty()) {
|
||||
throw new IllegalStateException("Enqueued an empty chain.");
|
||||
for (int i = 0; i < jobRunners.length; i++) {
|
||||
jobRunners[i] = new JobRunner(application, i + 1, jobController);
|
||||
jobRunners[i].start();
|
||||
}
|
||||
|
||||
for (int i = 0; i < jobListChain.size(); i++) {
|
||||
for (int j = 0; j < jobListChain.get(i).size(); j++) {
|
||||
jobListChain.get(i).get(j).onSubmit(context, requestListChain.get(i).get(j).getId());
|
||||
}
|
||||
for (ConstraintObserver constraintObserver : configuration.getConstraintObservers()) {
|
||||
constraintObserver.register(this);
|
||||
}
|
||||
|
||||
WorkContinuation continuation;
|
||||
|
||||
if (chainParameters.getGroupId().isPresent()) {
|
||||
ExistingWorkPolicy policy = chainParameters.shouldIgnoreDuplicates() ? ExistingWorkPolicy.KEEP : ExistingWorkPolicy.APPEND;
|
||||
continuation = workManager.beginUniqueWork(chainParameters.getGroupId().get(), policy, requestListChain.get(0));
|
||||
} else {
|
||||
continuation = workManager.beginWith(requestListChain.get(0));
|
||||
if (Build.VERSION.SDK_INT < 26) {
|
||||
application.startService(new Intent(application, KeepAliveService.class));
|
||||
}
|
||||
|
||||
for (int i = 1; i < requestListChain.size(); i++) {
|
||||
continuation = continuation.then(requestListChain.get(i));
|
||||
}
|
||||
|
||||
continuation.enqueue();
|
||||
wakeUp();
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
private OneTimeWorkRequest toWorkRequest(@NonNull Job job) {
|
||||
JobParameters jobParameters = job.getJobParameters();
|
||||
|
||||
if (jobParameters == null) {
|
||||
throw new IllegalStateException("Jobs must have JobParameters at this stage. (" + job.getClass().getSimpleName() + ")");
|
||||
}
|
||||
|
||||
Data.Builder dataBuilder = new Data.Builder().putInt(Job.KEY_RETRY_COUNT, jobParameters.getRetryCount())
|
||||
.putLong(Job.KEY_RETRY_UNTIL, jobParameters.getRetryUntil())
|
||||
.putLong(Job.KEY_SUBMIT_TIME, System.currentTimeMillis())
|
||||
.putBoolean(Job.KEY_REQUIRES_NETWORK, jobParameters.requiresNetwork())
|
||||
.putBoolean(Job.KEY_REQUIRES_SQLCIPHER, jobParameters.requiresSqlCipher());
|
||||
Data data = job.serialize(dataBuilder);
|
||||
|
||||
OneTimeWorkRequest.Builder requestBuilder = new OneTimeWorkRequest.Builder(job.getClass())
|
||||
.setInputData(data)
|
||||
.setBackoffCriteria(BackoffPolicy.LINEAR, OneTimeWorkRequest.MIN_BACKOFF_MILLIS, TimeUnit.MILLISECONDS);
|
||||
|
||||
if (jobParameters.requiresNetwork()) {
|
||||
requestBuilder.setConstraints(NETWORK_CONSTRAINT);
|
||||
}
|
||||
|
||||
return requestBuilder.build();
|
||||
/**
|
||||
* Enqueues a single job to be run.
|
||||
*/
|
||||
public void add(@NonNull Job job) {
|
||||
new Chain(this, Collections.singletonList(job)).enqueue();
|
||||
}
|
||||
|
||||
public class Chain {
|
||||
/**
|
||||
* Begins the creation of a job chain with a single job.
|
||||
* @see Chain
|
||||
*/
|
||||
public Chain startChain(@NonNull Job job) {
|
||||
return new Chain(this, Collections.singletonList(job));
|
||||
}
|
||||
|
||||
private final List<List<Job>> jobs = new LinkedList<>();
|
||||
/**
|
||||
* Begins the creation of a job chain with a set of jobs that can be run in parallel.
|
||||
* @see Chain
|
||||
*/
|
||||
public Chain startChain(@NonNull List<? extends Job> jobs) {
|
||||
return new Chain(this, jobs);
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieves a string representing the state of the job queue. Intended for debugging.
|
||||
*/
|
||||
public @NonNull String getDebugInfo() {
|
||||
Future<String> result = executor.submit(jobController::getDebugInfo);
|
||||
try {
|
||||
return result.get();
|
||||
} catch (ExecutionException | InterruptedException e) {
|
||||
Log.w(TAG, "Failed to retrieve Job info.", e);
|
||||
return "Failed to retrieve Job info.";
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds a listener to that will be notified when the job queue has been drained.
|
||||
*/
|
||||
void addOnEmptyQueueListener(@NonNull EmptyQueueListener listener) {
|
||||
executor.execute(() -> {
|
||||
emptyQueueListeners.add(listener);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes a listener that was added via {@link #addOnEmptyQueueListener(EmptyQueueListener)}.
|
||||
*/
|
||||
void removeOnEmptyQueueListener(@NonNull EmptyQueueListener listener) {
|
||||
executor.execute(() -> {
|
||||
emptyQueueListeners.remove(listener);
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onConstraintMet(@NonNull String reason) {
|
||||
Log.i(TAG, "onConstraintMet(" + reason + ")");
|
||||
wakeUp();
|
||||
}
|
||||
|
||||
/**
|
||||
* Pokes the system to take another pass at the job queue.
|
||||
*/
|
||||
void wakeUp() {
|
||||
executor.execute(jobController::wakeUp);
|
||||
}
|
||||
|
||||
private void enqueueChain(@NonNull Chain chain) {
|
||||
executor.execute(() -> {
|
||||
jobController.submitNewJobChain(chain.getJobListChain());
|
||||
wakeUp();
|
||||
});
|
||||
}
|
||||
|
||||
private void onEmptyQueue() {
|
||||
executor.execute(() -> {
|
||||
for (EmptyQueueListener listener : emptyQueueListeners) {
|
||||
listener.onQueueEmpty();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public interface EmptyQueueListener {
|
||||
void onQueueEmpty();
|
||||
}
|
||||
|
||||
/**
|
||||
* Allows enqueuing work that depends on each other. Jobs that appear later in the chain will
|
||||
* only run after all jobs earlier in the chain have been completed. If a job fails, all jobs
|
||||
* that occur later in the chain will also be failed.
|
||||
*/
|
||||
public static class Chain {
|
||||
|
||||
private final JobManager jobManager;
|
||||
private final List<List<Job>> jobs;
|
||||
|
||||
private Chain(@NonNull JobManager jobManager, @NonNull List<? extends Job> jobs) {
|
||||
this.jobManager = jobManager;
|
||||
this.jobs = new LinkedList<>();
|
||||
|
||||
private Chain(@NonNull List<? extends Job> jobs) {
|
||||
this.jobs.add(new ArrayList<>(jobs));
|
||||
}
|
||||
|
||||
@@ -141,16 +184,146 @@ public class JobManager {
|
||||
}
|
||||
|
||||
public Chain then(@NonNull List<Job> jobs) {
|
||||
this.jobs.add(new ArrayList<>(jobs));
|
||||
if (!jobs.isEmpty()) {
|
||||
this.jobs.add(new ArrayList<>(jobs));
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
public void enqueue(@NonNull ChainParameters chainParameters) {
|
||||
enqueueChain(this, chainParameters);
|
||||
public void enqueue() {
|
||||
jobManager.enqueueChain(this);
|
||||
}
|
||||
|
||||
private List<List<Job>> getJobListChain() {
|
||||
return jobs;
|
||||
}
|
||||
}
|
||||
|
||||
public static class Configuration {
|
||||
|
||||
private final ExecutorFactory executorFactory;
|
||||
private final int jobThreadCount;
|
||||
private final JobInstantiator jobInstantiator;
|
||||
private final ConstraintInstantiator constraintInstantiator;
|
||||
private final List<ConstraintObserver> constraintObservers;
|
||||
private final Data.Serializer dataSerializer;
|
||||
private final JobStorage jobStorage;
|
||||
private final DependencyInjector dependencyInjector;
|
||||
|
||||
private Configuration(int jobThreadCount,
|
||||
@NonNull ExecutorFactory executorFactory,
|
||||
@NonNull JobInstantiator jobInstantiator,
|
||||
@NonNull ConstraintInstantiator constraintInstantiator,
|
||||
@NonNull List<ConstraintObserver> constraintObservers,
|
||||
@NonNull Data.Serializer dataSerializer,
|
||||
@NonNull JobStorage jobStorage,
|
||||
@NonNull DependencyInjector dependencyInjector)
|
||||
{
|
||||
this.executorFactory = executorFactory;
|
||||
this.jobThreadCount = jobThreadCount;
|
||||
this.jobInstantiator = jobInstantiator;
|
||||
this.constraintInstantiator = constraintInstantiator;
|
||||
this.constraintObservers = constraintObservers;
|
||||
this.dataSerializer = dataSerializer;
|
||||
this.jobStorage = jobStorage;
|
||||
this.dependencyInjector = dependencyInjector;
|
||||
}
|
||||
|
||||
int getJobThreadCount() {
|
||||
return jobThreadCount;
|
||||
}
|
||||
|
||||
@NonNull ExecutorFactory getExecutorFactory() {
|
||||
return executorFactory;
|
||||
}
|
||||
|
||||
@NonNull
|
||||
JobInstantiator getJobInstantiator() {
|
||||
return jobInstantiator;
|
||||
}
|
||||
|
||||
@NonNull
|
||||
ConstraintInstantiator getConstraintFactories() {
|
||||
return constraintInstantiator;
|
||||
}
|
||||
|
||||
@NonNull List<ConstraintObserver> getConstraintObservers() {
|
||||
return constraintObservers;
|
||||
}
|
||||
|
||||
@NonNull Data.Serializer getDataSerializer() {
|
||||
return dataSerializer;
|
||||
}
|
||||
|
||||
@NonNull JobStorage getJobStorage() {
|
||||
return jobStorage;
|
||||
}
|
||||
|
||||
@NonNull DependencyInjector getDependencyInjector() {
|
||||
return dependencyInjector;
|
||||
}
|
||||
|
||||
public static class Builder {
|
||||
|
||||
private ExecutorFactory executorFactory = new DefaultExecutorFactory();
|
||||
private int jobThreadCount = Math.max(2, Math.min(Runtime.getRuntime().availableProcessors() - 1, 4));
|
||||
private Map<String, Job.Factory> jobFactories = new HashMap<>();
|
||||
private Map<String, Constraint.Factory> constraintFactories = new HashMap<>();
|
||||
private List<ConstraintObserver> constraintObservers = new ArrayList<>();
|
||||
private Data.Serializer dataSerializer = new JsonDataSerializer();
|
||||
private JobStorage jobStorage = null;
|
||||
private DependencyInjector dependencyInjector = o -> { /*noop*/ };
|
||||
|
||||
public @NonNull Builder setJobThreadCount(int jobThreadCount) {
|
||||
this.jobThreadCount = jobThreadCount;
|
||||
return this;
|
||||
}
|
||||
|
||||
public @NonNull Builder setExecutorFactory(@NonNull ExecutorFactory executorFactory) {
|
||||
this.executorFactory = executorFactory;
|
||||
return this;
|
||||
}
|
||||
|
||||
public @NonNull Builder setJobFactories(@NonNull Map<String, Job.Factory> jobFactories) {
|
||||
this.jobFactories = jobFactories;
|
||||
return this;
|
||||
}
|
||||
|
||||
public @NonNull Builder setConstraintFactories(@NonNull Map<String, Constraint.Factory> constraintFactories) {
|
||||
this.constraintFactories = constraintFactories;
|
||||
return this;
|
||||
}
|
||||
|
||||
public @NonNull Builder setConstraintObservers(@NonNull List<ConstraintObserver> constraintObservers) {
|
||||
this.constraintObservers = constraintObservers;
|
||||
return this;
|
||||
}
|
||||
|
||||
public @NonNull Builder setDataSerializer(@NonNull Data.Serializer dataSerializer) {
|
||||
this.dataSerializer = dataSerializer;
|
||||
return this;
|
||||
}
|
||||
|
||||
public @NonNull Builder setJobStorage(@NonNull JobStorage jobStorage) {
|
||||
this.jobStorage = jobStorage;
|
||||
return this;
|
||||
}
|
||||
|
||||
public @NonNull Builder setDependencyInjector(@NonNull DependencyInjector dependencyInjector) {
|
||||
this.dependencyInjector = dependencyInjector;
|
||||
return this;
|
||||
}
|
||||
|
||||
public @NonNull Configuration build() {
|
||||
return new Configuration(jobThreadCount,
|
||||
executorFactory,
|
||||
new JobInstantiator(jobFactories),
|
||||
new ConstraintInstantiator(constraintFactories),
|
||||
new ArrayList<>(constraintObservers),
|
||||
dataSerializer,
|
||||
jobStorage,
|
||||
dependencyInjector);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user