Check job requirements are satisfied in between retry iterations.

// FREEBIE
This commit is contained in:
Moxie Marlinspike 2014-10-21 13:50:38 -07:00
parent 4cdc0a3e61
commit c3eb0ea9db
2 changed files with 34 additions and 11 deletions

View File

@ -26,6 +26,7 @@ public abstract class Job implements Serializable {
private final JobParameters parameters; private final JobParameters parameters;
private transient long persistentId; private transient long persistentId;
private transient int runIteration;
public Job(JobParameters parameters) { public Job(JobParameters parameters) {
this.parameters = parameters; this.parameters = parameters;
@ -71,6 +72,14 @@ public abstract class Job implements Serializable {
return persistentId; return persistentId;
} }
public int getRunIteration() {
return runIteration;
}
public void setRunIteration(int runIteration) {
this.runIteration = runIteration;
}
public abstract void onAdded(); public abstract void onAdded();
public abstract void onRun() throws Throwable; public abstract void onRun() throws Throwable;
public abstract void onCanceled(); public abstract void onCanceled();

View File

@ -20,6 +20,12 @@ import org.whispersystems.jobqueue.persistence.PersistentStorage;
public class JobConsumer extends Thread { public class JobConsumer extends Thread {
enum JobResult {
SUCCESS,
FAILURE,
DEFERRED
}
private final JobQueue jobQueue; private final JobQueue jobQueue;
private final PersistentStorage persistentStorage; private final PersistentStorage persistentStorage;
@ -34,12 +40,16 @@ public class JobConsumer extends Thread {
while (true) { while (true) {
Job job = jobQueue.getNext(); Job job = jobQueue.getNext();
if (!runJob(job)) { JobResult result;
job.onCanceled();
}
if (job.isPersistent()) { if ((result = runJob(job)) != JobResult.DEFERRED) {
persistentStorage.remove(job.getPersistentId()); if (result == JobResult.FAILURE) {
job.onCanceled();
}
if (job.isPersistent()) {
persistentStorage.remove(job.getPersistentId());
}
} }
if (job.getGroupId() != null) { if (job.getGroupId() != null) {
@ -48,21 +58,25 @@ public class JobConsumer extends Thread {
} }
} }
private boolean runJob(Job job) { private JobResult runJob(Job job) {
int retryCount = job.getRetryCount(); int retryCount = job.getRetryCount();
int runIteration = job.getRunIteration();
for (int i=retryCount;i>0;i--) { for (;runIteration<retryCount;runIteration++) {
try { try {
job.onRun(); job.onRun();
return true; return JobResult.SUCCESS;
} catch (Throwable throwable) { } catch (Throwable throwable) {
if (!job.onShouldRetry(throwable)) { if (!job.onShouldRetry(throwable)) {
return false; return JobResult.FAILURE;
} else if (!job.isRequirementsMet()) {
job.setRunIteration(runIteration+1);
return JobResult.DEFERRED;
} }
} }
} }
return false; return JobResult.FAILURE;
} }
} }