Add support for passing data between jobs.

This commit is contained in:
Greyson Parrelli
2020-04-10 13:13:27 -04:00
parent acbfff89d3
commit 35f4f3f81e
12 changed files with 168 additions and 66 deletions

View File

@@ -40,6 +40,7 @@ public class JobDatabase extends Database {
private static final String MAX_INSTANCES = "max_instances";
private static final String LIFESPAN = "lifespan";
private static final String SERIALIZED_DATA = "serialized_data";
private static final String SERIALIZED_INPUT_DATA = "serialized_input_data";
private static final String IS_RUNNING = "is_running";
private static final String CREATE_TABLE = "CREATE TABLE " + TABLE_NAME + "(" + ID + " INTEGER PRIMARY KEY AUTOINCREMENT, " +
@@ -54,6 +55,7 @@ public class JobDatabase extends Database {
MAX_INSTANCES + " INTEGER, " +
LIFESPAN + " INTEGER, " +
SERIALIZED_DATA + " TEXT, " +
SERIALIZED_INPUT_DATA + " TEXT DEFAULT NULL, " +
IS_RUNNING + " INTEGER)";
}
@@ -165,6 +167,7 @@ public class JobDatabase extends Database {
values.put(Jobs.MAX_INSTANCES, job.getMaxInstances());
values.put(Jobs.LIFESPAN, job.getLifespan());
values.put(Jobs.SERIALIZED_DATA, job.getSerializedData());
values.put(Jobs.SERIALIZED_INPUT_DATA, job.getSerializedInputData());
values.put(Jobs.IS_RUNNING, job.isRunning() ? 1 : 0);
String query = Jobs.JOB_SPEC_ID + " = ?";
@@ -237,6 +240,7 @@ public class JobDatabase extends Database {
contentValues.put(Jobs.MAX_INSTANCES, job.getMaxInstances());
contentValues.put(Jobs.LIFESPAN, job.getLifespan());
contentValues.put(Jobs.SERIALIZED_DATA, job.getSerializedData());
contentValues.put(Jobs.SERIALIZED_INPUT_DATA, job.getSerializedInputData());
contentValues.put(Jobs.IS_RUNNING, job.isRunning() ? 1 : 0);
db.insertWithOnConflict(Jobs.TABLE_NAME, null, contentValues, SQLiteDatabase.CONFLICT_IGNORE);
@@ -272,6 +276,7 @@ public class JobDatabase extends Database {
cursor.getLong(cursor.getColumnIndexOrThrow(Jobs.LIFESPAN)),
cursor.getInt(cursor.getColumnIndexOrThrow(Jobs.MAX_INSTANCES)),
cursor.getString(cursor.getColumnIndexOrThrow(Jobs.SERIALIZED_DATA)),
cursor.getString(cursor.getColumnIndexOrThrow(Jobs.SERIALIZED_INPUT_DATA)),
cursor.getInt(cursor.getColumnIndexOrThrow(Jobs.IS_RUNNING)) == 1);
}

View File

@@ -127,8 +127,9 @@ public class SQLCipherOpenHelper extends SQLiteOpenHelper {
private static final int GROUPS_V2 = 55;
private static final int ATTACHMENT_UPLOAD_TIMESTAMP = 56;
private static final int ATTACHMENT_CDN_NUMBER = 57;
private static final int JOB_INPUT_DATA = 58;
private static final int DATABASE_VERSION = ATTACHMENT_CDN_NUMBER;
private static final int DATABASE_VERSION = 58;
private static final String DATABASE_NAME = "signal.db";
private final Context context;
@@ -868,6 +869,10 @@ public class SQLCipherOpenHelper extends SQLiteOpenHelper {
db.execSQL("ALTER TABLE part ADD COLUMN cdn_number INTEGER DEFAULT 0");
}
if (oldVersion < JOB_INPUT_DATA) {
db.execSQL("ALTER TABLE job_spec ADD COLUMN serialized_input_data TEXT DEFAULT NULL");
}
db.setTransactionSuccessful();
} finally {
db.endTransaction();

View File

@@ -11,6 +11,7 @@ import org.thoughtcrime.securesms.logging.Log;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@@ -59,6 +60,14 @@ public abstract class Job {
return nextRunAttemptTime;
}
public final @Nullable Data getInputData() {
return parameters.getInputData();
}
public final @NonNull Data requireInputData() {
return Objects.requireNonNull(parameters.getInputData());
}
/**
* This is already called by {@link JobController} during job submission, but if you ever run a
* job without submitting it to the {@link JobManager}, then you'll need to invoke this yourself.
@@ -140,21 +149,28 @@ public abstract class Job {
public static final class Result {
private static final Result SUCCESS = new Result(ResultType.SUCCESS, null);
private static final Result RETRY = new Result(ResultType.RETRY, null);
private static final Result FAILURE = new Result(ResultType.FAILURE, null);
private static final Result SUCCESS_NO_DATA = new Result(ResultType.SUCCESS, null, null);
private static final Result RETRY = new Result(ResultType.RETRY, null, null);
private static final Result FAILURE = new Result(ResultType.FAILURE, null, null);
private final ResultType resultType;
private final RuntimeException runtimeException;
private final Data outputData;
private Result(@NonNull ResultType resultType, @Nullable RuntimeException runtimeException) {
private Result(@NonNull ResultType resultType, @Nullable RuntimeException runtimeException, @Nullable Data outputData) {
this.resultType = resultType;
this.runtimeException = runtimeException;
this.outputData = outputData;
}
/** Job completed successfully. */
public static Result success() {
return SUCCESS;
return SUCCESS_NO_DATA;
}
/** Job completed successfully and wants to provide some output data. */
public static Result success(@Nullable Data outputData) {
return new Result(ResultType.SUCCESS, null, outputData);
}
/** Job did not complete successfully, but it can be retried later. */
@@ -169,7 +185,7 @@ public abstract class Job {
/** Same as {@link #failure()}, except the app should also crash with the provided exception. */
public static Result fatalFailure(@NonNull RuntimeException runtimeException) {
return new Result(ResultType.FAILURE, runtimeException);
return new Result(ResultType.FAILURE, runtimeException, null);
}
boolean isSuccess() {
@@ -188,6 +204,10 @@ public abstract class Job {
return runtimeException;
}
@Nullable Data getOutputData() {
return outputData;
}
@Override
public @NonNull String toString() {
switch (resultType) {
@@ -224,6 +244,7 @@ public abstract class Job {
private final int maxInstances;
private final String queue;
private final List<String> constraintKeys;
private final Data inputData;
private Parameters(@NonNull String id,
long createTime,
@@ -232,7 +253,8 @@ public abstract class Job {
long maxBackoff,
int maxInstances,
@Nullable String queue,
@NonNull List<String> constraintKeys)
@NonNull List<String> constraintKeys,
@Nullable Data inputData)
{
this.id = id;
this.createTime = createTime;
@@ -242,6 +264,7 @@ public abstract class Job {
this.maxInstances = maxInstances;
this.queue = queue;
this.constraintKeys = constraintKeys;
this.inputData = inputData;
}
@NonNull String getId() {
@@ -276,8 +299,12 @@ public abstract class Job {
return constraintKeys;
}
@Nullable Data getInputData() {
return inputData;
}
public Builder toBuilder() {
return new Builder(id, createTime, maxBackoff, lifespan, maxAttempts, maxInstances, queue, constraintKeys);
return new Builder(id, createTime, maxBackoff, lifespan, maxAttempts, maxInstances, queue, constraintKeys, inputData);
}
@@ -291,13 +318,14 @@ public abstract class Job {
private int maxInstances;
private String queue;
private List<String> constraintKeys;
private Data inputData;
public Builder() {
this(UUID.randomUUID().toString());
}
Builder(@NonNull String id) {
this(id, System.currentTimeMillis(), TimeUnit.SECONDS.toMillis(30), IMMORTAL, 1, UNLIMITED, null, new LinkedList<>());
this(id, System.currentTimeMillis(), TimeUnit.SECONDS.toMillis(30), IMMORTAL, 1, UNLIMITED, null, new LinkedList<>(), null);
}
private Builder(@NonNull String id,
@@ -307,7 +335,8 @@ public abstract class Job {
int maxAttempts,
int maxInstances,
@Nullable String queue,
@NonNull List<String> constraintKeys)
@NonNull List<String> constraintKeys,
@Nullable Data inputData)
{
this.id = id;
this.createTime = createTime;
@@ -317,6 +346,7 @@ public abstract class Job {
this.maxInstances = maxInstances;
this.queue = queue;
this.constraintKeys = constraintKeys;
this.inputData = inputData;
}
/** Should only be invoked by {@link JobController} */
@@ -394,8 +424,17 @@ public abstract class Job {
return this;
}
/**
* Sets the input data that will be made availabe to the job when it is run.
* Should only be set by {@link JobController}.
*/
@NonNull Builder setInputData(@Nullable Data inputData) {
this.inputData = inputData;
return this;
}
public @NonNull Parameters build() {
return new Parameters(id, createTime, lifespan, maxAttempts, maxBackoff, maxInstances, queue, constraintKeys);
return new Parameters(id, createTime, lifespan, maxAttempts, maxBackoff, maxInstances, queue, constraintKeys, inputData);
}
}
}

View File

@@ -170,7 +170,17 @@ class JobController {
}
@WorkerThread
synchronized void onSuccess(@NonNull Job job) {
synchronized void onSuccess(@NonNull Job job, @Nullable Data outputData) {
if (outputData != null) {
List<JobSpec> updates = Stream.of(jobStorage.getDependencySpecsThatDependOnJob(job.getId()))
.map(DependencySpec::getJobId)
.map(jobStorage::getJobSpec)
.map(jobSpec -> mapToJobWithInputData(jobSpec, outputData))
.toList();
jobStorage.updateJobs(updates);
}
jobStorage.deleteJob(job.getId());
jobTracker.onStateChange(job, JobTracker.JobState.SUCCESS);
notifyAll();
@@ -321,6 +331,7 @@ class JobController {
job.getParameters().getLifespan(),
job.getParameters().getMaxInstances(),
dataSerializer.serialize(job.serialize()),
null,
false);
List<ConstraintSpec> constraintSpecs = Stream.of(job.getParameters().getConstraintKeys())
@@ -402,6 +413,7 @@ class JobController {
.setQueue(jobSpec.getQueueKey())
.setConstraints(Stream.of(constraintSpecs).map(ConstraintSpec::getFactoryKey).toList())
.setMaxBackoff(jobSpec.getMaxBackoff())
.setInputData(jobSpec.getSerializedInputData() != null ? dataSerializer.deserialize(jobSpec.getSerializedInputData()) : null)
.build();
}
@@ -413,6 +425,22 @@ class JobController {
return currentTime + actualBackoff;
}
private @NonNull JobSpec mapToJobWithInputData(@NonNull JobSpec jobSpec, @NonNull Data inputData) {
return new JobSpec(jobSpec.getId(),
jobSpec.getFactoryKey(),
jobSpec.getQueueKey(),
jobSpec.getCreateTime(),
jobSpec.getNextRunAttemptTime(),
jobSpec.getRunAttempt(),
jobSpec.getMaxAttempts(),
jobSpec.getMaxBackoff(),
jobSpec.getLifespan(),
jobSpec.getMaxInstances(),
jobSpec.getSerializedData(),
dataSerializer.serialize(inputData),
jobSpec.isRunning());
}
interface Callback {
void onEmpty();
}

View File

@@ -75,6 +75,7 @@ public class JobMigrator {
jobSpec.getLifespan(),
jobSpec.getMaxInstances(),
dataSerializer.serialize(updatedJobData.getData()),
jobSpec.getSerializedInputData(),
jobSpec.isRunning());
iter.set(updatedJobSpec);

View File

@@ -48,7 +48,7 @@ class JobRunner extends Thread {
jobController.onJobFinished(job);
if (result.isSuccess()) {
jobController.onSuccess(job);
jobController.onSuccess(job, result.getOutputData());
} else if (result.isRetry()) {
jobController.onRetry(job);
job.onRetry();

View File

@@ -19,6 +19,7 @@ public final class JobSpec {
private final long lifespan;
private final int maxInstances;
private final String serializedData;
private final String serializedInputData;
private final boolean isRunning;
public JobSpec(@NonNull String id,
@@ -32,20 +33,22 @@ public final class JobSpec {
long lifespan,
int maxInstances,
@NonNull String serializedData,
@Nullable String serializedInputData,
boolean isRunning)
{
this.id = id;
this.factoryKey = factoryKey;
this.queueKey = queueKey;
this.createTime = createTime;
this.nextRunAttemptTime = nextRunAttemptTime;
this.maxBackoff = maxBackoff;
this.runAttempt = runAttempt;
this.maxAttempts = maxAttempts;
this.lifespan = lifespan;
this.maxInstances = maxInstances;
this.serializedData = serializedData;
this.isRunning = isRunning;
this.id = id;
this.factoryKey = factoryKey;
this.queueKey = queueKey;
this.createTime = createTime;
this.nextRunAttemptTime = nextRunAttemptTime;
this.maxBackoff = maxBackoff;
this.runAttempt = runAttempt;
this.maxAttempts = maxAttempts;
this.lifespan = lifespan;
this.maxInstances = maxInstances;
this.serializedData = serializedData;
this.serializedInputData = serializedInputData;
this.isRunning = isRunning;
}
public @NonNull String getId() {
@@ -92,6 +95,10 @@ public final class JobSpec {
return serializedData;
}
public @Nullable String getSerializedInputData() {
return serializedInputData;
}
public boolean isRunning() {
return isRunning;
}
@@ -112,18 +119,19 @@ public final class JobSpec {
Objects.equals(id, jobSpec.id) &&
Objects.equals(factoryKey, jobSpec.factoryKey) &&
Objects.equals(queueKey, jobSpec.queueKey) &&
Objects.equals(serializedData, jobSpec.serializedData);
Objects.equals(serializedData, jobSpec.serializedData) &&
Objects.equals(serializedInputData, jobSpec.serializedInputData);
}
@Override
public int hashCode() {
return Objects.hash(id, factoryKey, queueKey, createTime, nextRunAttemptTime, runAttempt, maxAttempts, maxBackoff, lifespan, maxInstances, serializedData, isRunning);
return Objects.hash(id, factoryKey, queueKey, createTime, nextRunAttemptTime, runAttempt, maxAttempts, maxBackoff, lifespan, maxInstances, serializedData, serializedInputData, isRunning);
}
@SuppressLint("DefaultLocale")
@Override
public @NonNull String toString() {
return String.format("id: JOB::%s | factoryKey: %s | queueKey: %s | createTime: %d | nextRunAttemptTime: %d | runAttempt: %d | maxAttempts: %d | maxBackoff: %d | maxInstances: %d | lifespan: %d | isRunning: %b | data: %s",
id, factoryKey, queueKey, createTime, nextRunAttemptTime, runAttempt, maxAttempts, maxBackoff, maxInstances, lifespan, isRunning, serializedData);
return String.format("id: JOB::%s | factoryKey: %s | queueKey: %s | createTime: %d | nextRunAttemptTime: %d | runAttempt: %d | maxAttempts: %d | maxBackoff: %d | maxInstances: %d | lifespan: %d | isRunning: %b | data: %s | inputData: %s",
id, factoryKey, queueKey, createTime, nextRunAttemptTime, runAttempt, maxAttempts, maxBackoff, maxInstances, lifespan, isRunning, serializedData, serializedInputData);
}
}

View File

@@ -68,6 +68,7 @@ final class WorkManagerDatabase extends SQLiteOpenHelper {
TimeUnit.DAYS.toMillis(1),
Job.Parameters.UNLIMITED,
dataSerializer.serialize(DataMigrator.convert(data)),
null,
false);

View File

@@ -3,14 +3,18 @@ package org.thoughtcrime.securesms.jobs;
import androidx.annotation.NonNull;
import androidx.annotation.Nullable;
import org.thoughtcrime.securesms.jobmanager.Data;
import org.thoughtcrime.securesms.jobmanager.Job;
import org.thoughtcrime.securesms.jobmanager.JobLogger;
import org.thoughtcrime.securesms.jobmanager.JobManager.Chain;
import org.thoughtcrime.securesms.logging.Log;
public abstract class BaseJob extends Job {
private static final String TAG = BaseJob.class.getSimpleName();
private Data outputData;
public BaseJob(@NonNull Parameters parameters) {
super(parameters);
}
@@ -19,7 +23,7 @@ public abstract class BaseJob extends Job {
public @NonNull Result run() {
try {
onRun();
return Result.success();
return Result.success(outputData);
} catch (RuntimeException e) {
Log.e(TAG, "Encountered a fatal exception. Crash imminent.", e);
return Result.fatalFailure(e);
@@ -38,6 +42,14 @@ public abstract class BaseJob extends Job {
protected abstract boolean onShouldRetry(@NonNull Exception e);
/**
* If this job is part of a {@link Chain}, data set here will be passed as input data to the next
* job(s) in the chain.
*/
protected void setOutputData(@Nullable Data outputData) {
this.outputData = outputData;
}
protected void log(@NonNull String tag, @NonNull String message) {
Log.i(tag, JobLogger.format(this, message));
}

View File

@@ -156,6 +156,7 @@ public class FastJobStorage implements JobStorage {
existing.getLifespan(),
existing.getMaxInstances(),
existing.getSerializedData(),
existing.getSerializedInputData(),
isRunning);
iter.set(updated);
}
@@ -182,6 +183,7 @@ public class FastJobStorage implements JobStorage {
existing.getLifespan(),
existing.getMaxInstances(),
serializedData,
existing.getSerializedInputData(),
isRunning);
iter.set(updated);
}
@@ -207,6 +209,7 @@ public class FastJobStorage implements JobStorage {
existing.getLifespan(),
existing.getMaxInstances(),
existing.getSerializedData(),
existing.getSerializedInputData(),
false);
iter.set(updated);
}