Prevent waiting for old queues in our retrieval strategies.

This commit is contained in:
Greyson Parrelli 2020-05-29 09:55:47 -04:00
parent b9f11dafff
commit 6b2e000e61
2 changed files with 2 additions and 10 deletions

View File

@ -37,14 +37,6 @@ public class JobTracker {
*/ */
synchronized void addListener(@NonNull JobFilter filter, @NonNull JobListener listener) { synchronized void addListener(@NonNull JobFilter filter, @NonNull JobListener listener) {
jobListeners.add(new ListenerInfo(filter, listener)); jobListeners.add(new ListenerInfo(filter, listener));
Stream.of(jobInfos.values())
.filter(info -> info.getJobState() != null)
.filter(info -> filter.matches(info.getJob()))
.forEach(state-> {
//noinspection ConstantConditions We already filter for nulls above
listenerExecutor.execute(() -> listener.onStateChanged(state.getJob(), state.getJobState()));
});
} }
/** /**

View File

@ -34,6 +34,8 @@ public class RestStrategy extends MessageRetrievalStrategy {
QueueFindingJobListener queueListener = new QueueFindingJobListener(); QueueFindingJobListener queueListener = new QueueFindingJobListener();
try (IncomingMessageProcessor.Processor processor = ApplicationDependencies.getIncomingMessageProcessor().acquire()) { try (IncomingMessageProcessor.Processor processor = ApplicationDependencies.getIncomingMessageProcessor().acquire()) {
jobManager.addListener(job -> job.getParameters().getQueue() != null && job.getParameters().getQueue().startsWith(PushProcessMessageJob.QUEUE_PREFIX), queueListener);
int jobCount = enqueuePushDecryptJobs(processor, startTime, timeout); int jobCount = enqueuePushDecryptJobs(processor, startTime, timeout);
if (jobCount == 0) { if (jobCount == 0) {
@ -43,8 +45,6 @@ public class RestStrategy extends MessageRetrievalStrategy {
Log.d(TAG, jobCount + " PushDecryptMessageJob(s) were enqueued."); Log.d(TAG, jobCount + " PushDecryptMessageJob(s) were enqueued.");
} }
jobManager.addListener(job -> job.getParameters().getQueue() != null && job.getParameters().getQueue().startsWith(PushProcessMessageJob.QUEUE_PREFIX), queueListener);
long timeRemainingMs = blockUntilQueueDrained(PushDecryptMessageJob.QUEUE, TimeUnit.SECONDS.toMillis(10)); long timeRemainingMs = blockUntilQueueDrained(PushDecryptMessageJob.QUEUE, TimeUnit.SECONDS.toMillis(10));
Set<String> processQueues = queueListener.getQueues(); Set<String> processQueues = queueListener.getQueues();