diff --git a/jobqueue/src/androidTest/java/org/whispersystems/jobqueue/JobManagerTest.java b/jobqueue/src/androidTest/java/org/whispersystems/jobqueue/JobManagerTest.java index 312de751bc..92c6d14dfa 100644 --- a/jobqueue/src/androidTest/java/org/whispersystems/jobqueue/JobManagerTest.java +++ b/jobqueue/src/androidTest/java/org/whispersystems/jobqueue/JobManagerTest.java @@ -86,5 +86,45 @@ public class JobManagerTest extends AndroidTestCase { assertTrue(PersistentResult.getInstance().isRan()); } + public void testGroupIdExecution() throws InterruptedException { + final Object lock = new Object(); + + Runnable waitRunnable = new Runnable() { + @Override + public void run() { + try { + synchronized (lock) { + lock.wait(); + } + } catch (InterruptedException e) { + throw new AssertionError(e); + } + } + }; + + TestJob testJobOne = new TestJob(JobParameters.newBuilder().withGroupId("foo").create(), waitRunnable); + TestJob testJobTwo = new TestJob(JobParameters.newBuilder().withGroupId("foo").create()); + TestJob testJobThree = new TestJob(JobParameters.newBuilder().withGroupId("bar").create()); + JobManager jobManager = new JobManager(getContext(), "transient-test", null, null, 3); + + jobManager.add(testJobOne); + jobManager.add(testJobTwo); + jobManager.add(testJobThree); + + assertTrue(testJobOne.isAdded()); + assertTrue(testJobTwo.isAdded()); + assertTrue(testJobThree.isAdded()); + + assertTrue(testJobOne.isRan()); + assertTrue(!testJobTwo.isRan()); + assertTrue(testJobThree.isRan()); + + synchronized (lock) { + lock.notifyAll(); + } + + assertTrue(testJobTwo.isRan()); + } + } diff --git a/jobqueue/src/androidTest/java/org/whispersystems/jobqueue/jobs/TestJob.java b/jobqueue/src/androidTest/java/org/whispersystems/jobqueue/jobs/TestJob.java index 228d77d6c7..9b9d6fa855 100644 --- a/jobqueue/src/androidTest/java/org/whispersystems/jobqueue/jobs/TestJob.java +++ b/jobqueue/src/androidTest/java/org/whispersystems/jobqueue/jobs/TestJob.java @@ -13,6 +13,8 @@ public class TestJob extends Job { private boolean ran = false; private boolean canceled = false; + private Runnable runnable; + public TestJob() { this(JobParameters.newBuilder().create()); } @@ -21,6 +23,11 @@ public class TestJob extends Job { super(parameters); } + public TestJob(JobParameters parameters, Runnable runnable) { + super(parameters); + this.runnable = runnable; + } + @Override public void onAdded() { synchronized (ADDED_LOCK) { @@ -34,6 +41,9 @@ public class TestJob extends Job { synchronized (RAN_LOCK) { this.ran = true; } + + if (runnable != null) + runnable.run(); } @Override diff --git a/jobqueue/src/main/java/org/whispersystems/jobqueue/Job.java b/jobqueue/src/main/java/org/whispersystems/jobqueue/Job.java index ccc27a67c5..508da4f7a9 100644 --- a/jobqueue/src/main/java/org/whispersystems/jobqueue/Job.java +++ b/jobqueue/src/main/java/org/whispersystems/jobqueue/Job.java @@ -27,6 +27,10 @@ public abstract class Job implements Serializable { return true; } + public String getGroupId() { + return parameters.getGroupId(); + } + public boolean isPersistent() { return parameters.isPersistent(); } diff --git a/jobqueue/src/main/java/org/whispersystems/jobqueue/JobConsumer.java b/jobqueue/src/main/java/org/whispersystems/jobqueue/JobConsumer.java index fd187a7fb4..9dd391857b 100644 --- a/jobqueue/src/main/java/org/whispersystems/jobqueue/JobConsumer.java +++ b/jobqueue/src/main/java/org/whispersystems/jobqueue/JobConsumer.java @@ -25,6 +25,10 @@ public class JobConsumer extends Thread { if (job.isPersistent()) { persistentStorage.remove(job.getPersistentId()); } + + if (job.getGroupId() != null) { + jobQueue.setGroupIdAvailable(job.getGroupId()); + } } } diff --git a/jobqueue/src/main/java/org/whispersystems/jobqueue/JobParameters.java b/jobqueue/src/main/java/org/whispersystems/jobqueue/JobParameters.java index 2504d14618..589340a8cd 100644 --- a/jobqueue/src/main/java/org/whispersystems/jobqueue/JobParameters.java +++ b/jobqueue/src/main/java/org/whispersystems/jobqueue/JobParameters.java @@ -13,14 +13,16 @@ public class JobParameters implements Serializable { private final List requirements; private final boolean isPersistent; private final int retryCount; + private final String groupId; private JobParameters(List requirements, - boolean isPersistent, + boolean isPersistent, String groupId, EncryptionKeys encryptionKeys, int retryCount) { this.requirements = requirements; this.isPersistent = isPersistent; + this.groupId = groupId; this.encryptionKeys = encryptionKeys; this.retryCount = retryCount; } @@ -49,11 +51,16 @@ public class JobParameters implements Serializable { return new Builder(); } + public String getGroupId() { + return groupId; + } + public static class Builder { private List requirements = new LinkedList<>(); private boolean isPersistent = false; private EncryptionKeys encryptionKeys = null; private int retryCount = 100; + private String groupId = null; public Builder withRequirement(Requirement requirement) { this.requirements.add(requirement); @@ -75,8 +82,13 @@ public class JobParameters implements Serializable { return this; } + public Builder withGroupId(String groupId) { + this.groupId = groupId; + return this; + } + public JobParameters create() { - return new JobParameters(requirements, isPersistent, encryptionKeys, retryCount); + return new JobParameters(requirements, isPersistent, groupId, encryptionKeys, retryCount); } } } diff --git a/jobqueue/src/main/java/org/whispersystems/jobqueue/JobQueue.java b/jobqueue/src/main/java/org/whispersystems/jobqueue/JobQueue.java index 2901c900d1..91e3a90213 100644 --- a/jobqueue/src/main/java/org/whispersystems/jobqueue/JobQueue.java +++ b/jobqueue/src/main/java/org/whispersystems/jobqueue/JobQueue.java @@ -1,12 +1,17 @@ package org.whispersystems.jobqueue; +import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.ListIterator; +import java.util.Map; +import java.util.Set; public class JobQueue { - private final LinkedList jobQueue = new LinkedList<>(); + private final Set activeGroupIds = new HashSet<>(); + private final LinkedList jobQueue = new LinkedList<>(); public synchronized void onRequirementStatusChanged() { notifyAll(); @@ -36,6 +41,13 @@ public class JobQueue { } } + public synchronized void setGroupIdAvailable(String groupId) { + if (groupId != null) { + activeGroupIds.remove(groupId); + notifyAll(); + } + } + private Job getNextAvailableJob() { if (jobQueue.isEmpty()) return null; @@ -43,12 +55,23 @@ public class JobQueue { while (iterator.hasNext()) { Job job = iterator.next(); - if (job.isRequirementsMet()) { + if (job.isRequirementsMet() && isGroupIdAvailable(job.getGroupId())) { iterator.remove(); + setGroupIdUnavailable(job.getGroupId()); return job; } } return null; } + + private boolean isGroupIdAvailable(String groupId) { + return groupId == null || !activeGroupIds.contains(groupId); + } + + private void setGroupIdUnavailable(String groupId) { + if (groupId != null) { + activeGroupIds.add(groupId); + } + } }