Update job serialized data after retry.

This commit is contained in:
Greyson Parrelli 2019-11-13 23:58:59 -05:00
parent 97cc82837c
commit 6cd4728e3c
5 changed files with 14 additions and 11 deletions

View File

@ -126,11 +126,12 @@ public class JobDatabase extends Database {
databaseHelper.getWritableDatabase().update(Jobs.TABLE_NAME, contentValues, query, args); databaseHelper.getWritableDatabase().update(Jobs.TABLE_NAME, contentValues, query, args);
} }
public synchronized void updateJobAfterRetry(@NonNull String id, boolean isRunning, int runAttempt, long nextRunAttemptTime) { public synchronized void updateJobAfterRetry(@NonNull String id, boolean isRunning, int runAttempt, long nextRunAttemptTime, @NonNull String serializedData) {
ContentValues contentValues = new ContentValues(); ContentValues contentValues = new ContentValues();
contentValues.put(Jobs.IS_RUNNING, isRunning ? 1 : 0); contentValues.put(Jobs.IS_RUNNING, isRunning ? 1 : 0);
contentValues.put(Jobs.RUN_ATTEMPT, runAttempt); contentValues.put(Jobs.RUN_ATTEMPT, runAttempt);
contentValues.put(Jobs.NEXT_RUN_ATTEMPT_TIME, nextRunAttemptTime); contentValues.put(Jobs.NEXT_RUN_ATTEMPT_TIME, nextRunAttemptTime);
contentValues.put(Jobs.SERIALIZED_DATA, serializedData);
String query = Jobs.JOB_SPEC_ID + " = ?"; String query = Jobs.JOB_SPEC_ID + " = ?";
String[] args = new String[]{ id }; String[] args = new String[]{ id };

View File

@ -98,10 +98,11 @@ class JobController {
@WorkerThread @WorkerThread
synchronized void onRetry(@NonNull Job job) { synchronized void onRetry(@NonNull Job job) {
int nextRunAttempt = job.getRunAttempt() + 1; int nextRunAttempt = job.getRunAttempt() + 1;
long nextRunAttemptTime = calculateNextRunAttemptTime(System.currentTimeMillis(), nextRunAttempt, job.getParameters().getMaxBackoff()); long nextRunAttemptTime = calculateNextRunAttemptTime(System.currentTimeMillis(), nextRunAttempt, job.getParameters().getMaxBackoff());
String serializedData = dataSerializer.serialize(job.serialize());
jobStorage.updateJobAfterRetry(job.getId(), false, nextRunAttempt, nextRunAttemptTime); jobStorage.updateJobAfterRetry(job.getId(), false, nextRunAttempt, nextRunAttemptTime, serializedData);
jobTracker.onStateChange(job.getId(), JobTracker.JobState.PENDING); jobTracker.onStateChange(job.getId(), JobTracker.JobState.PENDING);
List<Constraint> constraints = Stream.of(jobStorage.getConstraintSpecs(job.getId())) List<Constraint> constraints = Stream.of(jobStorage.getConstraintSpecs(job.getId()))

View File

@ -30,7 +30,7 @@ public interface JobStorage {
void updateJobRunningState(@NonNull String id, boolean isRunning); void updateJobRunningState(@NonNull String id, boolean isRunning);
@WorkerThread @WorkerThread
void updateJobAfterRetry(@NonNull String id, boolean isRunning, int runAttempt, long nextRunAttemptTime); void updateJobAfterRetry(@NonNull String id, boolean isRunning, int runAttempt, long nextRunAttemptTime, @NonNull String serializedData);
@WorkerThread @WorkerThread
void updateAllJobsToBePending(); void updateAllJobsToBePending();

View File

@ -163,8 +163,8 @@ public class FastJobStorage implements JobStorage {
} }
@Override @Override
public synchronized void updateJobAfterRetry(@NonNull String id, boolean isRunning, int runAttempt, long nextRunAttemptTime) { public synchronized void updateJobAfterRetry(@NonNull String id, boolean isRunning, int runAttempt, long nextRunAttemptTime, @NonNull String serializedData) {
jobDatabase.updateJobAfterRetry(id, isRunning, runAttempt, nextRunAttemptTime); jobDatabase.updateJobAfterRetry(id, isRunning, runAttempt, nextRunAttemptTime, serializedData);
ListIterator<JobSpec> iter = jobs.listIterator(); ListIterator<JobSpec> iter = jobs.listIterator();
@ -181,7 +181,7 @@ public class FastJobStorage implements JobStorage {
existing.getMaxBackoff(), existing.getMaxBackoff(),
existing.getLifespan(), existing.getLifespan(),
existing.getMaxInstances(), existing.getMaxInstances(),
existing.getSerializedData(), serializedData,
isRunning); isRunning);
iter.set(updated); iter.set(updated);
} }

View File

@ -165,9 +165,9 @@ public class FastJobStorageTest {
JobDatabase database = noopDatabase(); JobDatabase database = noopDatabase();
FastJobStorage subject = new FastJobStorage(database); FastJobStorage subject = new FastJobStorage(database);
subject.updateJobAfterRetry("1", true, 1, 10); subject.updateJobAfterRetry("1", true, 1, 10, "a");
verify(database).updateJobAfterRetry("1", true, 1, 10); verify(database).updateJobAfterRetry("1", true, 1, 10, "a");
} }
@Test @Test
@ -179,7 +179,7 @@ public class FastJobStorageTest {
FastJobStorage subject = new FastJobStorage(fixedDataDatabase(Collections.singletonList(fullSpec))); FastJobStorage subject = new FastJobStorage(fixedDataDatabase(Collections.singletonList(fullSpec)));
subject.init(); subject.init();
subject.updateJobAfterRetry("1", false, 1, 10); subject.updateJobAfterRetry("1", false, 1, 10, "a");
JobSpec job = subject.getJobSpec("1"); JobSpec job = subject.getJobSpec("1");
@ -187,6 +187,7 @@ public class FastJobStorageTest {
assertFalse(job.isRunning()); assertFalse(job.isRunning());
assertEquals(1, job.getRunAttempt()); assertEquals(1, job.getRunAttempt());
assertEquals(10, job.getNextRunAttemptTime()); assertEquals(10, job.getNextRunAttemptTime());
assertEquals("a", job.getSerializedData());
} }
@Test @Test