Support for job "group ids."

A job can specify a group id, and jobs with the same group id
will run sequentially.
This commit is contained in:
Moxie Marlinspike 2014-08-03 18:10:58 -07:00
parent 544f06451f
commit 58d101ff2e
6 changed files with 97 additions and 4 deletions

View File

@ -86,5 +86,45 @@ public class JobManagerTest extends AndroidTestCase {
assertTrue(PersistentResult.getInstance().isRan()); assertTrue(PersistentResult.getInstance().isRan());
} }
public void testGroupIdExecution() throws InterruptedException {
final Object lock = new Object();
Runnable waitRunnable = new Runnable() {
@Override
public void run() {
try {
synchronized (lock) {
lock.wait();
}
} catch (InterruptedException e) {
throw new AssertionError(e);
}
}
};
TestJob testJobOne = new TestJob(JobParameters.newBuilder().withGroupId("foo").create(), waitRunnable);
TestJob testJobTwo = new TestJob(JobParameters.newBuilder().withGroupId("foo").create());
TestJob testJobThree = new TestJob(JobParameters.newBuilder().withGroupId("bar").create());
JobManager jobManager = new JobManager(getContext(), "transient-test", null, null, 3);
jobManager.add(testJobOne);
jobManager.add(testJobTwo);
jobManager.add(testJobThree);
assertTrue(testJobOne.isAdded());
assertTrue(testJobTwo.isAdded());
assertTrue(testJobThree.isAdded());
assertTrue(testJobOne.isRan());
assertTrue(!testJobTwo.isRan());
assertTrue(testJobThree.isRan());
synchronized (lock) {
lock.notifyAll();
}
assertTrue(testJobTwo.isRan());
}
} }

View File

@ -13,6 +13,8 @@ public class TestJob extends Job {
private boolean ran = false; private boolean ran = false;
private boolean canceled = false; private boolean canceled = false;
private Runnable runnable;
public TestJob() { public TestJob() {
this(JobParameters.newBuilder().create()); this(JobParameters.newBuilder().create());
} }
@ -21,6 +23,11 @@ public class TestJob extends Job {
super(parameters); super(parameters);
} }
public TestJob(JobParameters parameters, Runnable runnable) {
super(parameters);
this.runnable = runnable;
}
@Override @Override
public void onAdded() { public void onAdded() {
synchronized (ADDED_LOCK) { synchronized (ADDED_LOCK) {
@ -34,6 +41,9 @@ public class TestJob extends Job {
synchronized (RAN_LOCK) { synchronized (RAN_LOCK) {
this.ran = true; this.ran = true;
} }
if (runnable != null)
runnable.run();
} }
@Override @Override

View File

@ -27,6 +27,10 @@ public abstract class Job implements Serializable {
return true; return true;
} }
public String getGroupId() {
return parameters.getGroupId();
}
public boolean isPersistent() { public boolean isPersistent() {
return parameters.isPersistent(); return parameters.isPersistent();
} }

View File

@ -25,6 +25,10 @@ public class JobConsumer extends Thread {
if (job.isPersistent()) { if (job.isPersistent()) {
persistentStorage.remove(job.getPersistentId()); persistentStorage.remove(job.getPersistentId());
} }
if (job.getGroupId() != null) {
jobQueue.setGroupIdAvailable(job.getGroupId());
}
} }
} }

View File

@ -13,14 +13,16 @@ public class JobParameters implements Serializable {
private final List<Requirement> requirements; private final List<Requirement> requirements;
private final boolean isPersistent; private final boolean isPersistent;
private final int retryCount; private final int retryCount;
private final String groupId;
private JobParameters(List<Requirement> requirements, private JobParameters(List<Requirement> requirements,
boolean isPersistent, boolean isPersistent, String groupId,
EncryptionKeys encryptionKeys, EncryptionKeys encryptionKeys,
int retryCount) int retryCount)
{ {
this.requirements = requirements; this.requirements = requirements;
this.isPersistent = isPersistent; this.isPersistent = isPersistent;
this.groupId = groupId;
this.encryptionKeys = encryptionKeys; this.encryptionKeys = encryptionKeys;
this.retryCount = retryCount; this.retryCount = retryCount;
} }
@ -49,11 +51,16 @@ public class JobParameters implements Serializable {
return new Builder(); return new Builder();
} }
public String getGroupId() {
return groupId;
}
public static class Builder { public static class Builder {
private List<Requirement> requirements = new LinkedList<>(); private List<Requirement> requirements = new LinkedList<>();
private boolean isPersistent = false; private boolean isPersistent = false;
private EncryptionKeys encryptionKeys = null; private EncryptionKeys encryptionKeys = null;
private int retryCount = 100; private int retryCount = 100;
private String groupId = null;
public Builder withRequirement(Requirement requirement) { public Builder withRequirement(Requirement requirement) {
this.requirements.add(requirement); this.requirements.add(requirement);
@ -75,8 +82,13 @@ public class JobParameters implements Serializable {
return this; return this;
} }
public Builder withGroupId(String groupId) {
this.groupId = groupId;
return this;
}
public JobParameters create() { public JobParameters create() {
return new JobParameters(requirements, isPersistent, encryptionKeys, retryCount); return new JobParameters(requirements, isPersistent, groupId, encryptionKeys, retryCount);
} }
} }
} }

View File

@ -1,12 +1,17 @@
package org.whispersystems.jobqueue; package org.whispersystems.jobqueue;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.ListIterator; import java.util.ListIterator;
import java.util.Map;
import java.util.Set;
public class JobQueue { public class JobQueue {
private final LinkedList<Job> jobQueue = new LinkedList<>(); private final Set<String> activeGroupIds = new HashSet<>();
private final LinkedList<Job> jobQueue = new LinkedList<>();
public synchronized void onRequirementStatusChanged() { public synchronized void onRequirementStatusChanged() {
notifyAll(); notifyAll();
@ -36,6 +41,13 @@ public class JobQueue {
} }
} }
public synchronized void setGroupIdAvailable(String groupId) {
if (groupId != null) {
activeGroupIds.remove(groupId);
notifyAll();
}
}
private Job getNextAvailableJob() { private Job getNextAvailableJob() {
if (jobQueue.isEmpty()) return null; if (jobQueue.isEmpty()) return null;
@ -43,12 +55,23 @@ public class JobQueue {
while (iterator.hasNext()) { while (iterator.hasNext()) {
Job job = iterator.next(); Job job = iterator.next();
if (job.isRequirementsMet()) { if (job.isRequirementsMet() && isGroupIdAvailable(job.getGroupId())) {
iterator.remove(); iterator.remove();
setGroupIdUnavailable(job.getGroupId());
return job; return job;
} }
} }
return null; return null;
} }
private boolean isGroupIdAvailable(String groupId) {
return groupId == null || !activeGroupIds.contains(groupId);
}
private void setGroupIdUnavailable(String groupId) {
if (groupId != null) {
activeGroupIds.add(groupId);
}
}
} }