clean up old job manager

This commit is contained in:
Ryan Zhao
2023-05-08 12:29:21 +10:00
parent 2b48b52df0
commit 2ceb9e2bf4
23 changed files with 20 additions and 1900 deletions

View File

@@ -1,69 +0,0 @@
package org.thoughtcrime.securesms.jobmanager;
import android.app.AlarmManager;
import android.app.Application;
import android.app.PendingIntent;
import android.content.BroadcastReceiver;
import android.content.Context;
import android.content.Intent;
import androidx.annotation.NonNull;
import com.annimon.stream.Stream;
import org.session.libsignal.utilities.Log;
import org.thoughtcrime.securesms.ApplicationContext;
import java.util.List;
import java.util.UUID;
import network.loki.messenger.BuildConfig;
/**
* Schedules tasks using the {@link AlarmManager}.
*
* Given that this scheduler is only used when {@link KeepAliveService} is also used (which keeps
* all of the {@link ConstraintObserver}s running), this only needs to schedule future runs in
* situations where all constraints are already met. Otherwise, the {@link ConstraintObserver}s will
* trigger future runs when the constraints are met.
*
* For the same reason, this class also doesn't have to schedule jobs that don't have delays.
*
* Important: Only use on API < 26.
*/
public class AlarmManagerScheduler implements Scheduler {
private static final String TAG = AlarmManagerScheduler.class.getSimpleName();
private final Application application;
AlarmManagerScheduler(@NonNull Application application) {
this.application = application;
}
@Override
public void schedule(long delay, @NonNull List<Constraint> constraints) {
if (delay > 0 && Stream.of(constraints).allMatch(Constraint::isMet)) {
setUniqueAlarm(application, System.currentTimeMillis() + delay);
}
}
private void setUniqueAlarm(@NonNull Context context, long time) {
AlarmManager alarmManager = (AlarmManager) context.getSystemService(Context.ALARM_SERVICE);
Intent intent = new Intent(context, RetryReceiver.class);
intent.setAction(BuildConfig.APPLICATION_ID + UUID.randomUUID().toString());
alarmManager.set(AlarmManager.RTC_WAKEUP, time, PendingIntent.getBroadcast(context, 0, intent, PendingIntent.FLAG_IMMUTABLE));
Log.i(TAG, "Set an alarm to retry a job in " + (time - System.currentTimeMillis()) + " ms.");
}
public static class RetryReceiver extends BroadcastReceiver {
@Override
public void onReceive(Context context, Intent intent) {
Log.i(TAG, "Received an alarm to retry a job.");
ApplicationContext.getInstance(context).getJobManager().wakeUp();
}
}
}

View File

@@ -1,22 +0,0 @@
package org.thoughtcrime.securesms.jobmanager;
import androidx.annotation.NonNull;
import java.util.Arrays;
import java.util.List;
class CompositeScheduler implements Scheduler {
private final List<Scheduler> schedulers;
CompositeScheduler(@NonNull Scheduler... schedulers) {
this.schedulers = Arrays.asList(schedulers);
}
@Override
public void schedule(long delay, @NonNull List<Constraint> constraints) {
for (Scheduler scheduler : schedulers) {
scheduler.schedule(delay, constraints);
}
}
}

View File

@@ -1,23 +0,0 @@
package org.thoughtcrime.securesms.jobmanager;
import androidx.annotation.NonNull;
import java.util.HashMap;
import java.util.Map;
public class ConstraintInstantiator {
private final Map<String, Constraint.Factory> constraintFactories;
ConstraintInstantiator(@NonNull Map<String, Constraint.Factory> constraintFactories) {
this.constraintFactories = new HashMap<>(constraintFactories);
}
public @NonNull Constraint instantiate(@NonNull String constraintFactoryKey) {
if (constraintFactories.containsKey(constraintFactoryKey)) {
return constraintFactories.get(constraintFactoryKey).create();
} else {
throw new IllegalStateException("Tried to instantiate a constraint with key '" + constraintFactoryKey + "', but no matching factory was found.");
}
}
}

View File

@@ -1,24 +0,0 @@
/**
* Copyright (C) 2014 Open Whisper Systems
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.thoughtcrime.securesms.jobmanager;
/**
* Interface responsible for injecting dependencies into Jobs.
*/
public interface DependencyInjector {
void injectDependencies(Object object);
}

View File

@@ -1,49 +0,0 @@
package org.thoughtcrime.securesms.jobmanager;
import android.os.Handler;
import android.os.HandlerThread;
import androidx.annotation.NonNull;
import com.annimon.stream.Stream;
import org.session.libsignal.utilities.Log;
import java.util.List;
/**
* Schedules future runs on an in-app handler. Intended to be used in combination with a persistent
* {@link Scheduler} to improve responsiveness when the app is open.
*
* This should only schedule runs when all constraints are met. Because this only works when the
* app is foregrounded, jobs that don't have their constraints met will be run when the relevant
* {@link ConstraintObserver} is triggered.
*
* Similarly, this does not need to schedule retries with no delay, as this doesn't provide any
* persistence, and other mechanisms will take care of that.
*/
class InAppScheduler implements Scheduler {
private static final String TAG = InAppScheduler.class.getSimpleName();
private final JobManager jobManager;
private final Handler handler;
InAppScheduler(@NonNull JobManager jobManager) {
HandlerThread handlerThread = new HandlerThread("InAppScheduler");
handlerThread.start();
this.jobManager = jobManager;
this.handler = new Handler(handlerThread.getLooper());
}
@Override
public void schedule(long delay, @NonNull List<Constraint> constraints) {
if (delay > 0 && Stream.of(constraints).allMatch(Constraint::isMet)) {
Log.i(TAG, "Scheduling a retry in " + delay + " ms.");
handler.postDelayed(() -> {
Log.i(TAG, "Triggering a job retry.");
jobManager.wakeUp();
}, delay);
}
}
}

View File

@@ -1,286 +0,0 @@
package org.thoughtcrime.securesms.jobmanager;
import android.content.Context;
import androidx.annotation.NonNull;
import androidx.annotation.Nullable;
import androidx.annotation.WorkerThread;
import org.session.libsession.messaging.utilities.Data;
import org.session.libsignal.utilities.Log;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* A durable unit of work.
*
* Jobs have {@link Parameters} that describe the conditions upon when you'd like them to run, how
* often they should be retried, and how long they should be retried for.
*
* Never rely on a specific instance of this class being run. It can be created and destroyed as the
* job is retried. State that you want to save is persisted to a {@link Data} object in
* {@link #serialize()}. Your job is then recreated using a {@link Factory} that you register in
* {@link JobManager.Configuration.Builder#setJobFactories(Map)}, which is given the saved
* {@link Data} bundle.
*
* @deprecated
* use <a href="https://developer.android.com/reference/androidx/work/WorkManager">WorkManager</a>
* API instead.
*/
public abstract class Job {
private static final String TAG = Log.tag(Job.class);
private final Parameters parameters;
private String id;
private int runAttempt;
private long nextRunAttemptTime;
protected Context context;
public Job(@NonNull Parameters parameters) {
this.parameters = parameters;
}
public final String getId() {
return id;
}
public final @NonNull Parameters getParameters() {
return parameters;
}
public final int getRunAttempt() {
return runAttempt;
}
public final long getNextRunAttemptTime() {
return nextRunAttemptTime;
}
/**
* This is already called by {@link JobController} during job submission, but if you ever run a
* job without submitting it to the {@link JobManager}, then you'll need to invoke this yourself.
*/
public final void setContext(@NonNull Context context) {
this.context = context;
}
/** Should only be invoked by {@link JobController} */
final void setId(@NonNull String id) {
this.id = id;
}
/** Should only be invoked by {@link JobController} */
final void setRunAttempt(int runAttempt) {
this.runAttempt = runAttempt;
}
/** Should only be invoked by {@link JobController} */
final void setNextRunAttemptTime(long nextRunAttemptTime) {
this.nextRunAttemptTime = nextRunAttemptTime;
}
@WorkerThread
final void onSubmit() {
Log.i(TAG, JobLogger.format(this, "onSubmit()"));
onAdded();
}
/**
* Called when the job is first submitted to the {@link JobManager}.
*/
@WorkerThread
public void onAdded() {
}
/**
* Called after a job has run and its determined that a retry is required.
*/
@WorkerThread
public void onRetry() {
}
/**
* Serialize your job state so that it can be recreated in the future.
*/
public abstract @NonNull Data serialize();
/**
* Returns the key that can be used to find the relevant factory needed to create your job.
*/
public abstract @NonNull String getFactoryKey();
/**
* Called to do your actual work.
*/
@WorkerThread
public abstract @NonNull Result run();
/**
* Called when your job has completely failed.
*/
@WorkerThread
public abstract void onCanceled();
public interface Factory<T extends Job> {
@NonNull T create(@NonNull Parameters parameters, @NonNull Data data);
}
public enum Result {
SUCCESS, FAILURE, RETRY
}
public static final class Parameters {
public static final int IMMORTAL = -1;
public static final int UNLIMITED = -1;
private final long createTime;
private final long lifespan;
private final int maxAttempts;
private final long maxBackoff;
private final int maxInstances;
private final String queue;
private final List<String> constraintKeys;
private Parameters(long createTime,
long lifespan,
int maxAttempts,
long maxBackoff,
int maxInstances,
@Nullable String queue,
@NonNull List<String> constraintKeys)
{
this.createTime = createTime;
this.lifespan = lifespan;
this.maxAttempts = maxAttempts;
this.maxBackoff = maxBackoff;
this.maxInstances = maxInstances;
this.queue = queue;
this.constraintKeys = constraintKeys;
}
public long getCreateTime() {
return createTime;
}
public long getLifespan() {
return lifespan;
}
public int getMaxAttempts() {
return maxAttempts;
}
public long getMaxBackoff() {
return maxBackoff;
}
public int getMaxInstances() {
return maxInstances;
}
public @Nullable String getQueue() {
return queue;
}
public List<String> getConstraintKeys() {
return constraintKeys;
}
public static final class Builder {
private long createTime = System.currentTimeMillis();
private long maxBackoff = TimeUnit.SECONDS.toMillis(30);
private long lifespan = IMMORTAL;
private int maxAttempts = 1;
private int maxInstances = UNLIMITED;
private String queue = null;
private List<String> constraintKeys = new LinkedList<>();
/** Should only be invoked by {@link JobController} */
Builder setCreateTime(long createTime) {
this.createTime = createTime;
return this;
}
/**
* Specify the amount of time this job is allowed to be retried. Defaults to {@link #IMMORTAL}.
*/
public @NonNull Builder setLifespan(long lifespan) {
this.lifespan = lifespan;
return this;
}
/**
* Specify the maximum number of times you want to attempt this job. Defaults to 1.
*/
public @NonNull Builder setMaxAttempts(int maxAttempts) {
this.maxAttempts = maxAttempts;
return this;
}
/**
* Specify the longest amount of time to wait between retries. No guarantees that this will
* be respected on API >= 26.
*/
public @NonNull Builder setMaxBackoff(long maxBackoff) {
this.maxBackoff = maxBackoff;
return this;
}
/**
* Specify the maximum number of instances you'd want of this job at any given time. If
* enqueueing this job would put it over that limit, it will be ignored.
*
* Duplicates are determined by two jobs having the same {@link Job#getFactoryKey()}.
*
* This property is ignored if the job is submitted as part of a {@link JobManager.Chain}.
*
* Defaults to {@link #UNLIMITED}.
*/
public @NonNull Builder setMaxInstances(int maxInstances) {
this.maxInstances = maxInstances;
return this;
}
/**
* Specify a string representing a queue. All jobs within the same queue are run in a
* serialized fashion -- one after the other, in order of insertion. Failure of a job earlier
* in the queue has no impact on the execution of jobs later in the queue.
*/
public @NonNull Builder setQueue(@Nullable String queue) {
this.queue = queue;
return this;
}
/**
* Add a constraint via the key that was used to register its factory in
* {@link JobManager.Configuration)};
*/
public @NonNull Builder addConstraint(@NonNull String constraintKey) {
constraintKeys.add(constraintKey);
return this;
}
/**
* Set constraints via the key that was used to register its factory in
* {@link JobManager.Configuration)};
*/
public @NonNull Builder setConstraints(@NonNull List<String> constraintKeys) {
this.constraintKeys.clear();
this.constraintKeys.addAll(constraintKeys);
return this;
}
public @NonNull Parameters build() {
return new Parameters(createTime, lifespan, maxAttempts, maxBackoff, maxInstances, queue, constraintKeys);
}
}
}
}

View File

@@ -1,354 +0,0 @@
package org.thoughtcrime.securesms.jobmanager;
import android.app.Application;
import androidx.annotation.NonNull;
import androidx.annotation.Nullable;
import androidx.annotation.WorkerThread;
import com.annimon.stream.Stream;
import org.session.libsession.messaging.utilities.Data;
import org.session.libsession.utilities.Debouncer;
import org.session.libsignal.utilities.Log;
import org.thoughtcrime.securesms.jobmanager.persistence.ConstraintSpec;
import org.thoughtcrime.securesms.jobmanager.persistence.DependencySpec;
import org.thoughtcrime.securesms.jobmanager.persistence.FullSpec;
import org.thoughtcrime.securesms.jobmanager.persistence.JobSpec;
import org.thoughtcrime.securesms.jobmanager.persistence.JobStorage;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.UUID;
/**
* Manages the queue of jobs. This is the only class that should write to {@link JobStorage} to
* ensure consistency.
*/
class JobController {
private static final String TAG = JobController.class.getSimpleName();
private final Application application;
private final JobStorage jobStorage;
private final JobInstantiator jobInstantiator;
private final ConstraintInstantiator constraintInstantiator;
private final Data.Serializer dataSerializer;
private final Scheduler scheduler;
private final Debouncer debouncer;
private final Callback callback;
private final Set<String> runningJobs;
JobController(@NonNull Application application,
@NonNull JobStorage jobStorage,
@NonNull JobInstantiator jobInstantiator,
@NonNull ConstraintInstantiator constraintInstantiator,
@NonNull Data.Serializer dataSerializer,
@NonNull Scheduler scheduler,
@NonNull Debouncer debouncer,
@NonNull Callback callback)
{
this.application = application;
this.jobStorage = jobStorage;
this.jobInstantiator = jobInstantiator;
this.constraintInstantiator = constraintInstantiator;
this.dataSerializer = dataSerializer;
this.scheduler = scheduler;
this.debouncer = debouncer;
this.callback = callback;
this.runningJobs = new HashSet<>();
}
@WorkerThread
synchronized void init() {
jobStorage.init();
jobStorage.updateAllJobsToBePending();
notifyAll();
}
synchronized void wakeUp() {
notifyAll();
}
@WorkerThread
synchronized void submitNewJobChain(@NonNull List<List<Job>> chain) {
chain = Stream.of(chain).filterNot(List::isEmpty).toList();
if (chain.isEmpty()) {
Log.w(TAG, "Tried to submit an empty job chain. Skipping.");
return;
}
if (chainExceedsMaximumInstances(chain)) {
Job solo = chain.get(0).get(0);
Log.w(TAG, JobLogger.format(solo, "Already at the max instance count of " + solo.getParameters().getMaxInstances() + ". Skipping."));
return;
}
insertJobChain(chain);
scheduleJobs(chain.get(0));
triggerOnSubmit(chain);
notifyAll();
}
@WorkerThread
synchronized void onRetry(@NonNull Job job) {
int nextRunAttempt = job.getRunAttempt() + 1;
long nextRunAttemptTime = calculateNextRunAttemptTime(System.currentTimeMillis(), nextRunAttempt, job.getParameters().getMaxBackoff());
jobStorage.updateJobAfterRetry(job.getId(), false, nextRunAttempt, nextRunAttemptTime);
List<Constraint> constraints = Stream.of(jobStorage.getConstraintSpecs(job.getId()))
.map(ConstraintSpec::getFactoryKey)
.map(constraintInstantiator::instantiate)
.toList();
long delay = Math.max(0, nextRunAttemptTime - System.currentTimeMillis());
Log.i(TAG, JobLogger.format(job, "Scheduling a retry in " + delay + " ms."));
scheduler.schedule(delay, constraints);
notifyAll();
}
synchronized void onJobFinished(@NonNull Job job) {
runningJobs.remove(job.getId());
}
@WorkerThread
synchronized void onSuccess(@NonNull Job job) {
jobStorage.deleteJob(job.getId());
notifyAll();
}
/**
* @return The list of all dependent jobs that should also be failed.
*/
@WorkerThread
synchronized @NonNull List<Job> onFailure(@NonNull Job job) {
List<Job> dependents = Stream.of(jobStorage.getDependencySpecsThatDependOnJob(job.getId()))
.map(DependencySpec::getJobId)
.map(jobStorage::getJobSpec)
.withoutNulls()
.map(jobSpec -> {
List<ConstraintSpec> constraintSpecs = jobStorage.getConstraintSpecs(jobSpec.getId());
return createJob(jobSpec, constraintSpecs);
})
.toList();
List<Job> all = new ArrayList<>(dependents.size() + 1);
all.add(job);
all.addAll(dependents);
jobStorage.deleteJobs(Stream.of(all).map(Job::getId).toList());
return dependents;
}
/**
* Retrieves the next job that is eligible for execution. To be 'eligible' means that the job:
* - Has no dependencies
* - Has no unmet constraints
*
* This method will block until a job is available.
* When the job returned from this method has been run, you must call {@link #onJobFinished(Job)}.
*/
@WorkerThread
synchronized @NonNull Job pullNextEligibleJobForExecution() {
try {
Job job;
while ((job = getNextEligibleJobForExecution()) == null) {
if (runningJobs.isEmpty()) {
debouncer.publish(callback::onEmpty);
}
wait();
}
jobStorage.updateJobRunningState(job.getId(), true);
runningJobs.add(job.getId());
return job;
} catch (InterruptedException e) {
Log.e(TAG, "Interrupted.");
throw new AssertionError(e);
}
}
/**
* Retrieves a string representing the state of the job queue. Intended for debugging.
*/
@WorkerThread
synchronized @NonNull String getDebugInfo() {
List<JobSpec> jobs = jobStorage.getAllJobSpecs();
List<ConstraintSpec> constraints = jobStorage.getAllConstraintSpecs();
List<DependencySpec> dependencies = jobStorage.getAllDependencySpecs();
StringBuilder info = new StringBuilder();
info.append("-- Jobs\n");
if (!jobs.isEmpty()) {
Stream.of(jobs).forEach(j -> info.append(j.toString()).append('\n'));
} else {
info.append("None\n");
}
info.append("\n-- Constraints\n");
if (!constraints.isEmpty()) {
Stream.of(constraints).forEach(c -> info.append(c.toString()).append('\n'));
} else {
info.append("None\n");
}
info.append("\n-- Dependencies\n");
if (!dependencies.isEmpty()) {
Stream.of(dependencies).forEach(d -> info.append(d.toString()).append('\n'));
} else {
info.append("None\n");
}
return info.toString();
}
@WorkerThread
private boolean chainExceedsMaximumInstances(@NonNull List<List<Job>> chain) {
if (chain.size() == 1 && chain.get(0).size() == 1) {
Job solo = chain.get(0).get(0);
if (solo.getParameters().getMaxInstances() != Job.Parameters.UNLIMITED &&
jobStorage.getJobInstanceCount(solo.getFactoryKey()) >= solo.getParameters().getMaxInstances())
{
return true;
}
}
return false;
}
@WorkerThread
private void triggerOnSubmit(@NonNull List<List<Job>> chain) {
Stream.of(chain)
.forEach(list -> Stream.of(list).forEach(job -> {
job.setContext(application);
job.onSubmit();
}));
}
@WorkerThread
private void insertJobChain(@NonNull List<List<Job>> chain) {
List<FullSpec> fullSpecs = new LinkedList<>();
List<Job> dependsOn = Collections.emptyList();
for (List<Job> jobList : chain) {
for (Job job : jobList) {
fullSpecs.add(buildFullSpec(job, dependsOn));
}
dependsOn = jobList;
}
jobStorage.insertJobs(fullSpecs);
}
@WorkerThread
private @NonNull FullSpec buildFullSpec(@NonNull Job job, @NonNull List<Job> dependsOn) {
String id = UUID.randomUUID().toString();
job.setId(id);
job.setRunAttempt(0);
JobSpec jobSpec = new JobSpec(job.getId(),
job.getFactoryKey(),
job.getParameters().getQueue(),
job.getParameters().getCreateTime(),
job.getNextRunAttemptTime(),
job.getRunAttempt(),
job.getParameters().getMaxAttempts(),
job.getParameters().getMaxBackoff(),
job.getParameters().getLifespan(),
job.getParameters().getMaxInstances(),
dataSerializer.serialize(job.serialize()),
false);
List<ConstraintSpec> constraintSpecs = Stream.of(job.getParameters().getConstraintKeys())
.map(key -> new ConstraintSpec(jobSpec.getId(), key))
.toList();
List<DependencySpec> dependencySpecs = Stream.of(dependsOn)
.map(depends -> new DependencySpec(job.getId(), depends.getId()))
.toList();
return new FullSpec(jobSpec, constraintSpecs, dependencySpecs);
}
@WorkerThread
private void scheduleJobs(@NonNull List<Job> jobs) {
for (Job job : jobs) {
List<Constraint> constraints = Stream.of(job.getParameters().getConstraintKeys())
.map(key -> new ConstraintSpec(job.getId(), key))
.map(ConstraintSpec::getFactoryKey)
.map(constraintInstantiator::instantiate)
.toList();
scheduler.schedule(0, constraints);
}
}
@WorkerThread
private @Nullable Job getNextEligibleJobForExecution() {
List<JobSpec> jobSpecs = jobStorage.getPendingJobsWithNoDependenciesInCreatedOrder(System.currentTimeMillis());
for (JobSpec jobSpec : jobSpecs) {
List<ConstraintSpec> constraintSpecs = jobStorage.getConstraintSpecs(jobSpec.getId());
List<Constraint> constraints = Stream.of(constraintSpecs)
.map(ConstraintSpec::getFactoryKey)
.map(constraintInstantiator::instantiate)
.toList();
if (Stream.of(constraints).allMatch(Constraint::isMet)) {
return createJob(jobSpec, constraintSpecs);
}
}
return null;
}
private @NonNull Job createJob(@NonNull JobSpec jobSpec, @NonNull List<ConstraintSpec> constraintSpecs) {
Job.Parameters parameters = buildJobParameters(jobSpec, constraintSpecs);
Data data = dataSerializer.deserialize(jobSpec.getSerializedData());
Job job = jobInstantiator.instantiate(jobSpec.getFactoryKey(), parameters, data);
job.setId(jobSpec.getId());
job.setRunAttempt(jobSpec.getRunAttempt());
job.setNextRunAttemptTime(jobSpec.getNextRunAttemptTime());
job.setContext(application);
return job;
}
private @NonNull Job.Parameters buildJobParameters(@NonNull JobSpec jobSpec, @NonNull List<ConstraintSpec> constraintSpecs) {
return new Job.Parameters.Builder()
.setCreateTime(jobSpec.getCreateTime())
.setLifespan(jobSpec.getLifespan())
.setMaxAttempts(jobSpec.getMaxAttempts())
.setQueue(jobSpec.getQueueKey())
.setConstraints(Stream.of(constraintSpecs).map(ConstraintSpec::getFactoryKey).toList())
.build();
}
private long calculateNextRunAttemptTime(long currentTime, int nextAttempt, long maxBackoff) {
int boundedAttempt = Math.min(nextAttempt, 30);
long exponentialBackoff = (long) Math.pow(2, boundedAttempt) * 1000;
long actualBackoff = Math.min(exponentialBackoff, maxBackoff);
return currentTime + actualBackoff;
}
interface Callback {
void onEmpty();
}
}

View File

@@ -2,6 +2,7 @@ package org.thoughtcrime.securesms.jobmanager;
import androidx.annotation.NonNull;
import org.session.libsession.messaging.jobs.Job;
import org.session.libsession.messaging.utilities.Data;
import java.util.HashMap;
@@ -15,9 +16,9 @@ class JobInstantiator {
this.jobFactories = new HashMap<>(jobFactories);
}
public @NonNull Job instantiate(@NonNull String jobFactoryKey, @NonNull Job.Parameters parameters, @NonNull Data data) {
public @NonNull Job instantiate(@NonNull String jobFactoryKey, @NonNull Data data) {
if (jobFactories.containsKey(jobFactoryKey)) {
return jobFactories.get(jobFactoryKey).create(parameters, data);
return jobFactories.get(jobFactoryKey).create(data);
} else {
throw new IllegalStateException("Tried to instantiate a job with key '" + jobFactoryKey + "', but no matching factory was found.");
}

View File

@@ -1,24 +0,0 @@
package org.thoughtcrime.securesms.jobmanager;
import androidx.annotation.NonNull;
import android.text.TextUtils;
public class JobLogger {
public static String format(@NonNull Job job, @NonNull String event) {
return format(job, "", event);
}
public static String format(@NonNull Job job, @NonNull String extraTag, @NonNull String event) {
String id = job.getId();
String tag = TextUtils.isEmpty(extraTag) ? "" : "[" + extraTag + "]";
long timeSinceSubmission = System.currentTimeMillis() - job.getParameters().getCreateTime();
int runAttempt = job.getRunAttempt() + 1;
String maxAttempts = job.getParameters().getMaxAttempts() == Job.Parameters.UNLIMITED ? "Unlimited"
: String.valueOf(job.getParameters().getMaxAttempts());
String lifespan = job.getParameters().getLifespan() == Job.Parameters.IMMORTAL ? "Immortal"
: String.valueOf(job.getParameters().getLifespan()) + " ms";
return String.format("[%s][%s]%s %s (Time Since Submission: %d ms, Lifespan: %s, Run Attempt: %d/%s)",
id, job.getClass().getSimpleName(), tag, event, timeSinceSubmission, lifespan, runAttempt, maxAttempts);
}
}

View File

@@ -6,24 +6,14 @@ import android.os.Build;
import androidx.annotation.NonNull;
import org.session.libsession.messaging.utilities.Data;
import org.session.libsession.utilities.Debouncer;
import org.session.libsignal.utilities.Log;
import org.thoughtcrime.securesms.jobmanager.impl.DefaultExecutorFactory;
import org.thoughtcrime.securesms.jobmanager.impl.JsonDataSerializer;
import org.thoughtcrime.securesms.jobmanager.persistence.JobStorage;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
/**
* Allows the scheduling of durable jobs that will be run as early as possible.
@@ -33,32 +23,13 @@ public class JobManager implements ConstraintObserver.Notifier {
private static final String TAG = JobManager.class.getSimpleName();
private final ExecutorService executor;
private final JobController jobController;
private final JobRunner[] jobRunners;
private final Set<EmptyQueueListener> emptyQueueListeners = new CopyOnWriteArraySet<>();
public JobManager(@NonNull Application application, @NonNull Configuration configuration) {
this.executor = configuration.getExecutorFactory().newSingleThreadExecutor("JobManager");
this.jobRunners = new JobRunner[configuration.getJobThreadCount()];
this.jobController = new JobController(application,
configuration.getJobStorage(),
configuration.getJobInstantiator(),
configuration.getConstraintFactories(),
configuration.getDataSerializer(),
Build.VERSION.SDK_INT < 26 ? new AlarmManagerScheduler(application)
: new CompositeScheduler(new InAppScheduler(this), new JobSchedulerScheduler(application)),
new Debouncer(500),
this::onEmptyQueue);
executor.execute(() -> {
jobController.init();
for (int i = 0; i < jobRunners.length; i++) {
jobRunners[i] = new JobRunner(application, i + 1, jobController);
jobRunners[i].start();
}
for (ConstraintObserver constraintObserver : configuration.getConstraintObservers()) {
constraintObserver.register(this);
}
@@ -71,13 +42,6 @@ public class JobManager implements ConstraintObserver.Notifier {
});
}
/**
* Enqueues a single job to be run.
*/
public void add(@NonNull Job job) {
new Chain(this, Collections.singletonList(job)).enqueue();
}
/**
* Adds a listener to that will be notified when the job queue has been drained.
*/
@@ -105,166 +69,45 @@ public class JobManager implements ConstraintObserver.Notifier {
/**
* Pokes the system to take another pass at the job queue.
*/
void wakeUp() {
executor.execute(jobController::wakeUp);
}
private void enqueueChain(@NonNull Chain chain) {
executor.execute(() -> {
jobController.submitNewJobChain(chain.getJobListChain());
wakeUp();
});
}
private void onEmptyQueue() {
executor.execute(() -> {
for (EmptyQueueListener listener : emptyQueueListeners) {
listener.onQueueEmpty();
}
});
}
void wakeUp() {}
public interface EmptyQueueListener {
void onQueueEmpty();
}
/**
* Allows enqueuing work that depends on each other. Jobs that appear later in the chain will
* only run after all jobs earlier in the chain have been completed. If a job fails, all jobs
* that occur later in the chain will also be failed.
*/
public static class Chain {
private final JobManager jobManager;
private final List<List<Job>> jobs;
private Chain(@NonNull JobManager jobManager, @NonNull List<? extends Job> jobs) {
this.jobManager = jobManager;
this.jobs = new LinkedList<>();
this.jobs.add(new ArrayList<>(jobs));
}
public Chain then(@NonNull Job job) {
return then(Collections.singletonList(job));
}
public Chain then(@NonNull List<Job> jobs) {
if (!jobs.isEmpty()) {
this.jobs.add(new ArrayList<>(jobs));
}
return this;
}
public void enqueue() {
jobManager.enqueueChain(this);
}
private List<List<Job>> getJobListChain() {
return jobs;
}
}
public static class Configuration {
private final ExecutorFactory executorFactory;
private final int jobThreadCount;
private final JobInstantiator jobInstantiator;
private final ConstraintInstantiator constraintInstantiator;
private final List<ConstraintObserver> constraintObservers;
private final Data.Serializer dataSerializer;
private final JobStorage jobStorage;
private Configuration(int jobThreadCount,
@NonNull ExecutorFactory executorFactory,
@NonNull JobInstantiator jobInstantiator,
@NonNull ConstraintInstantiator constraintInstantiator,
@NonNull List<ConstraintObserver> constraintObservers,
@NonNull Data.Serializer dataSerializer,
@NonNull JobStorage jobStorage)
private Configuration(@NonNull ExecutorFactory executorFactory,
@NonNull List<ConstraintObserver> constraintObservers)
{
this.executorFactory = executorFactory;
this.jobThreadCount = jobThreadCount;
this.jobInstantiator = jobInstantiator;
this.constraintInstantiator = constraintInstantiator;
this.constraintObservers = constraintObservers;
this.dataSerializer = dataSerializer;
this.jobStorage = jobStorage;
}
int getJobThreadCount() {
return jobThreadCount;
}
@NonNull ExecutorFactory getExecutorFactory() {
return executorFactory;
}
@NonNull JobInstantiator getJobInstantiator() {
return jobInstantiator;
}
@NonNull
ConstraintInstantiator getConstraintFactories() {
return constraintInstantiator;
}
@NonNull List<ConstraintObserver> getConstraintObservers() {
return constraintObservers;
}
@NonNull Data.Serializer getDataSerializer() {
return dataSerializer;
}
@NonNull JobStorage getJobStorage() {
return jobStorage;
}
public static class Builder {
private ExecutorFactory executorFactory = new DefaultExecutorFactory();
private int jobThreadCount = 1;
private Map<String, Job.Factory> jobFactories = new HashMap<>();
private Map<String, Constraint.Factory> constraintFactories = new HashMap<>();
private List<ConstraintObserver> constraintObservers = new ArrayList<>();
private Data.Serializer dataSerializer = new JsonDataSerializer();
private JobStorage jobStorage = null;
public @NonNull Builder setJobFactories(@NonNull Map<String, Job.Factory> jobFactories) {
this.jobFactories = jobFactories;
return this;
}
public @NonNull Builder setConstraintFactories(@NonNull Map<String, Constraint.Factory> constraintFactories) {
this.constraintFactories = constraintFactories;
return this;
}
public @NonNull Builder setConstraintObservers(@NonNull List<ConstraintObserver> constraintObservers) {
this.constraintObservers = constraintObservers;
return this;
}
public @NonNull Builder setDataSerializer(@NonNull Data.Serializer dataSerializer) {
this.dataSerializer = dataSerializer;
return this;
}
public @NonNull Builder setJobStorage(@NonNull JobStorage jobStorage) {
this.jobStorage = jobStorage;
return this;
}
public @NonNull Configuration build() {
return new Configuration(jobThreadCount,
executorFactory,
new JobInstantiator(jobFactories),
new ConstraintInstantiator(constraintFactories),
new ArrayList<>(constraintObservers),
dataSerializer,
jobStorage);
return new Configuration(executorFactory,
new ArrayList<>(constraintObservers));
}
}
}

View File

@@ -1,110 +0,0 @@
package org.thoughtcrime.securesms.jobmanager;
import android.app.Application;
import android.os.PowerManager;
import androidx.annotation.NonNull;
import com.annimon.stream.Stream;
import org.session.libsignal.utilities.Log;
import org.thoughtcrime.securesms.util.WakeLockUtil;
import java.util.List;
import java.util.concurrent.TimeUnit;
class JobRunner extends Thread {
private static final String TAG = JobRunner.class.getSimpleName();
private static long WAKE_LOCK_TIMEOUT = TimeUnit.MINUTES.toMillis(10);
private final Application application;
private final int id;
private final JobController jobController;
JobRunner(@NonNull Application application, int id, @NonNull JobController jobController) {
super("JobRunner-" + id);
this.application = application;
this.id = id;
this.jobController = jobController;
}
@Override
public synchronized void run() {
while (true) {
Job job = jobController.pullNextEligibleJobForExecution();
Job.Result result = run(job);
jobController.onJobFinished(job);
switch (result) {
case SUCCESS:
jobController.onSuccess(job);
break;
case RETRY:
jobController.onRetry(job);
job.onRetry();
break;
case FAILURE:
List<Job> dependents = jobController.onFailure(job);
job.onCanceled();
Stream.of(dependents).forEach(Job::onCanceled);
break;
}
}
}
private Job.Result run(@NonNull Job job) {
Log.i(TAG, JobLogger.format(job, String.valueOf(id), "Running job."));
if (isJobExpired(job)) {
Log.w(TAG, JobLogger.format(job, String.valueOf(id), "Failing after surpassing its lifespan."));
return Job.Result.FAILURE;
}
Job.Result result = null;
PowerManager.WakeLock wakeLock = null;
try {
wakeLock = WakeLockUtil.acquire(application, PowerManager.PARTIAL_WAKE_LOCK, WAKE_LOCK_TIMEOUT, job.getId());
result = job.run();
} catch (Exception e) {
Log.w(TAG, JobLogger.format(job, String.valueOf(id), "Failing due to an unexpected exception."), e);
return Job.Result.FAILURE;
} finally {
if (wakeLock != null) {
WakeLockUtil.release(wakeLock, job.getId());
}
}
printResult(job, result);
if (result == Job.Result.RETRY && job.getRunAttempt() + 1 >= job.getParameters().getMaxAttempts() &&
job.getParameters().getMaxAttempts() != Job.Parameters.UNLIMITED)
{
Log.w(TAG, JobLogger.format(job, String.valueOf(id), "Failing after surpassing its max number of attempts."));
return Job.Result.FAILURE;
}
return result;
}
private boolean isJobExpired(@NonNull Job job) {
long expirationTime = job.getParameters().getCreateTime() + job.getParameters().getLifespan();
if (expirationTime < 0) {
expirationTime = Long.MAX_VALUE;
}
return job.getParameters().getLifespan() != Job.Parameters.IMMORTAL && expirationTime <= System.currentTimeMillis();
}
private void printResult(@NonNull Job job, @NonNull Job.Result result) {
if (result == Job.Result.FAILURE) {
Log.w(TAG, JobLogger.format(job, String.valueOf(id), "Job failed."));
} else {
Log.i(TAG, JobLogger.format(job, String.valueOf(id), "Job finished with result: " + result));
}
}
}

View File

@@ -1,90 +0,0 @@
package org.thoughtcrime.securesms.jobmanager;
import android.app.Application;
import android.app.job.JobInfo;
import android.app.job.JobParameters;
import android.app.job.JobScheduler;
import android.app.job.JobService;
import android.content.ComponentName;
import android.content.Context;
import android.content.SharedPreferences;
import androidx.annotation.NonNull;
import androidx.annotation.RequiresApi;
import org.thoughtcrime.securesms.ApplicationContext;
import org.session.libsignal.utilities.Log;
import java.util.List;
@RequiresApi(26)
public class JobSchedulerScheduler implements Scheduler {
private static final String TAG = JobSchedulerScheduler.class.getSimpleName();
private static final String PREF_NAME = "JobSchedulerScheduler_prefs";
private static final String PREF_NEXT_ID = "pref_next_id";
private static final int MAX_ID = 75;
private final Application application;
JobSchedulerScheduler(@NonNull Application application) {
this.application = application;
}
@RequiresApi(26)
@Override
public void schedule(long delay, @NonNull List<Constraint> constraints) {
JobInfo.Builder jobInfoBuilder = new JobInfo.Builder(getNextId(), new ComponentName(application, SystemService.class))
.setMinimumLatency(delay)
.setPersisted(true);
for (Constraint constraint : constraints) {
constraint.applyToJobInfo(jobInfoBuilder);
}
Log.i(TAG, "Scheduling a run in " + delay + " ms.");
JobScheduler jobScheduler = application.getSystemService(JobScheduler.class);
jobScheduler.schedule(jobInfoBuilder.build());
}
private int getNextId() {
SharedPreferences prefs = application.getSharedPreferences(PREF_NAME, Context.MODE_PRIVATE);
int returnedId = prefs.getInt(PREF_NEXT_ID, 0);
int nextId = returnedId + 1 > MAX_ID ? 0 : returnedId + 1;
prefs.edit().putInt(PREF_NEXT_ID, nextId).apply();
return returnedId;
}
@RequiresApi(api = 26)
public static class SystemService extends JobService {
@Override
public boolean onStartJob(JobParameters params) {
Log.d(TAG, "onStartJob()");
JobManager jobManager = ApplicationContext.getInstance(getApplicationContext()).getJobManager();
jobManager.addOnEmptyQueueListener(new JobManager.EmptyQueueListener() {
@Override
public void onQueueEmpty() {
jobManager.removeOnEmptyQueueListener(this);
jobFinished(params, false);
Log.d(TAG, "jobFinished()");
}
});
jobManager.wakeUp();
return true;
}
@Override
public boolean onStopJob(JobParameters params) {
Log.d(TAG, "onStopJob()");
return false;
}
}
}

View File

@@ -1,9 +0,0 @@
package org.thoughtcrime.securesms.jobmanager;
import androidx.annotation.NonNull;
import java.util.List;
public interface Scheduler {
void schedule(long delay, @NonNull List<Constraint> constraints);
}