diff --git a/app/src/main/java/org/thoughtcrime/securesms/dependencies/ApplicationDependencyProvider.java b/app/src/main/java/org/thoughtcrime/securesms/dependencies/ApplicationDependencyProvider.java index e3dab15820..43329406c8 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/dependencies/ApplicationDependencyProvider.java +++ b/app/src/main/java/org/thoughtcrime/securesms/dependencies/ApplicationDependencyProvider.java @@ -134,7 +134,7 @@ public class ApplicationDependencyProvider implements ApplicationDependencies.Pr .setJobFactories(JobManagerFactories.getJobFactories(context)) .setConstraintFactories(JobManagerFactories.getConstraintFactories(context)) .setConstraintObservers(JobManagerFactories.getConstraintObservers(context)) - .setJobStorage(new FastJobStorage(DatabaseFactory.getJobDatabase(context))) + .setJobStorage(new FastJobStorage(DatabaseFactory.getJobDatabase(context), SignalExecutors.newCachedSingleThreadExecutor("signal-fast-job-storage"))) .setJobMigrator(new JobMigrator(TextSecurePreferences.getJobManagerVersion(context), JobManager.CURRENT_VERSION, JobManagerFactories.getJobMigrations(context))) .build()); } diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobController.java b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobController.java index ca5b354b71..d26a917152 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobController.java +++ b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobController.java @@ -77,6 +77,11 @@ class JobController { notifyAll(); } + @WorkerThread + synchronized void flush() { + jobStorage.flush(); + } + @WorkerThread synchronized void submitNewJobChain(@NonNull List> chain) { chain = Stream.of(chain).filterNot(List::isEmpty).toList(); diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobManager.java b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobManager.java index 071522cdce..d2c9137561 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobManager.java +++ b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobManager.java @@ -3,6 +3,8 @@ package org.thoughtcrime.securesms.jobmanager; import android.app.Application; import android.content.Intent; import android.os.Build; + +import androidx.annotation.GuardedBy; import androidx.annotation.NonNull; import androidx.annotation.Nullable; import androidx.annotation.WorkerThread; @@ -14,6 +16,8 @@ import org.thoughtcrime.securesms.jobmanager.persistence.JobStorage; import org.thoughtcrime.securesms.logging.Log; import org.thoughtcrime.securesms.util.Debouncer; import org.thoughtcrime.securesms.util.TextSecurePreferences; +import org.thoughtcrime.securesms.util.Util; +import org.thoughtcrime.securesms.util.concurrent.NonMainThreadExecutor; import org.whispersystems.libsignal.util.guava.Optional; import java.util.ArrayList; @@ -26,9 +30,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -41,18 +43,21 @@ public class JobManager implements ConstraintObserver.Notifier { public static final int CURRENT_VERSION = 7; - private final Application application; - private final Configuration configuration; - private final ExecutorService executor; - private final JobController jobController; - private final JobTracker jobTracker; + private final Application application; + private final Configuration configuration; + private final Executor executor; + private final JobController jobController; + private final JobTracker jobTracker; + @GuardedBy("emptyQueueListeners") private final Set emptyQueueListeners = new CopyOnWriteArraySet<>(); + private volatile boolean initialized; + public JobManager(@NonNull Application application, @NonNull Configuration configuration) { this.application = application; this.configuration = configuration; - this.executor = configuration.getExecutorFactory().newSingleThreadExecutor("signal-JobManager"); + this.executor = new NonMainThreadExecutor(configuration.getExecutorFactory().newSingleThreadExecutor("signal-JobManager")); this.jobTracker = configuration.getJobTracker(); this.jobController = new JobController(application, configuration.getJobStorage(), @@ -66,25 +71,30 @@ public class JobManager implements ConstraintObserver.Notifier { this::onEmptyQueue); executor.execute(() -> { - if (WorkManagerMigrator.needsMigration(application)) { - Log.i(TAG, "Detected an old WorkManager database. Migrating."); - WorkManagerMigrator.migrate(application, configuration.getJobStorage(), configuration.getDataSerializer()); - } + synchronized (this) { + if (WorkManagerMigrator.needsMigration(application)) { + Log.i(TAG, "Detected an old WorkManager database. Migrating."); + WorkManagerMigrator.migrate(application, configuration.getJobStorage(), configuration.getDataSerializer()); + } - JobStorage jobStorage = configuration.getJobStorage(); - jobStorage.init(); + JobStorage jobStorage = configuration.getJobStorage(); + jobStorage.init(); - int latestVersion = configuration.getJobMigrator().migrate(jobStorage, configuration.getDataSerializer()); - TextSecurePreferences.setJobManagerVersion(application, latestVersion); + int latestVersion = configuration.getJobMigrator().migrate(jobStorage, configuration.getDataSerializer()); + TextSecurePreferences.setJobManagerVersion(application, latestVersion); - jobController.init(); + jobController.init(); - for (ConstraintObserver constraintObserver : configuration.getConstraintObservers()) { - constraintObserver.register(this); - } + for (ConstraintObserver constraintObserver : configuration.getConstraintObservers()) { + constraintObserver.register(this); + } - if (Build.VERSION.SDK_INT < 26) { - application.startService(new Intent(application, KeepAliveService.class)); + if (Build.VERSION.SDK_INT < 26) { + application.startService(new Intent(application, KeepAliveService.class)); + } + + initialized = true; + notifyAll(); } }); } @@ -93,11 +103,11 @@ public class JobManager implements ConstraintObserver.Notifier { * Begins the execution of jobs. */ public void beginJobLoop() { - executor.execute(() -> { + runOnExecutor(()-> { for (int i = 0; i < configuration.getJobThreadCount(); i++) { new JobRunner(application, i + 1, jobController).start(); } - wakeUp(); + jobController.wakeUp(); }); } @@ -138,9 +148,9 @@ public class JobManager implements ConstraintObserver.Notifier { public void add(@NonNull Job job, @NonNull Collection dependsOn) { jobTracker.onStateChange(job, JobTracker.JobState.PENDING); - executor.execute(() -> { + runOnExecutor(() -> { jobController.submitJobWithExistingDependencies(job, dependsOn, null); - wakeUp(); + jobController.wakeUp(); }); } @@ -151,9 +161,9 @@ public class JobManager implements ConstraintObserver.Notifier { public void add(@NonNull Job job, @Nullable String dependsOnQueue) { jobTracker.onStateChange(job, JobTracker.JobState.PENDING); - executor.execute(() -> { + runOnExecutor(() -> { jobController.submitJobWithExistingDependencies(job, Collections.emptyList(), dependsOnQueue); - wakeUp(); + jobController.wakeUp(); }); } @@ -164,9 +174,9 @@ public class JobManager implements ConstraintObserver.Notifier { public void add(@NonNull Job job, @NonNull Collection dependsOn, @Nullable String dependsOnQueue) { jobTracker.onStateChange(job, JobTracker.JobState.PENDING); - executor.execute(() -> { + runOnExecutor(() -> { jobController.submitJobWithExistingDependencies(job, dependsOn, dependsOnQueue); - wakeUp(); + jobController.wakeUp(); }); } @@ -195,14 +205,14 @@ public class JobManager implements ConstraintObserver.Notifier { * moment. Just like a normal failure, all later jobs in the same chain will also be failed. */ public void cancel(@NonNull String id) { - executor.execute(() -> jobController.cancelJob(id)); + runOnExecutor(() -> jobController.cancelJob(id)); } /** * Cancels all jobs in the specified queue. See {@link #cancel(String)} for details. */ public void cancelAllInQueue(@NonNull String queue) { - executor.execute(() -> jobController.cancelAllInQueue(queue)); + runOnExecutor(() -> jobController.cancelAllInQueue(queue)); } /** @@ -247,10 +257,22 @@ public class JobManager implements ConstraintObserver.Notifier { */ @WorkerThread public @NonNull String getDebugInfo() { - Future result = executor.submit(jobController::getDebugInfo); + AtomicReference result = new AtomicReference<>(); + CountDownLatch latch = new CountDownLatch(1); + + runOnExecutor(() -> { + result.set(jobController.getDebugInfo()); + latch.countDown(); + }); + try { - return result.get(); - } catch (ExecutionException | InterruptedException e) { + boolean finished = latch.await(10, TimeUnit.SECONDS); + if (finished) { + return result.get(); + } else { + return "Timed out waiting for Job info."; + } + } catch (InterruptedException e) { Log.w(TAG, "Failed to retrieve Job info.", e); return "Failed to retrieve Job info."; } @@ -260,8 +282,10 @@ public class JobManager implements ConstraintObserver.Notifier { * Adds a listener that will be notified when the job queue has been drained. */ void addOnEmptyQueueListener(@NonNull EmptyQueueListener listener) { - executor.execute(() -> { - emptyQueueListeners.add(listener); + runOnExecutor(() -> { + synchronized (emptyQueueListeners) { + emptyQueueListeners.add(listener); + } }); } @@ -269,8 +293,10 @@ public class JobManager implements ConstraintObserver.Notifier { * Removes a listener that was added via {@link #addOnEmptyQueueListener(EmptyQueueListener)}. */ void removeOnEmptyQueueListener(@NonNull EmptyQueueListener listener) { - executor.execute(() -> { - emptyQueueListeners.remove(listener); + runOnExecutor(() -> { + synchronized (emptyQueueListeners) { + emptyQueueListeners.remove(listener); + } }); } @@ -280,11 +306,31 @@ public class JobManager implements ConstraintObserver.Notifier { wakeUp(); } + /** + * Blocks until all pending operations are finished. + */ + @WorkerThread + public void flush() { + CountDownLatch latch = new CountDownLatch(1); + + runOnExecutor(() -> { + jobController.flush(); + latch.countDown(); + }); + + try { + latch.await(); + Log.i(TAG, "Successfully flushed."); + } catch (InterruptedException e) { + Log.w(TAG, "Failed to finish flushing.", e); + } + } + /** * Pokes the system to take another pass at the job queue. */ void wakeUp() { - executor.execute(jobController::wakeUp); + runOnExecutor(jobController::wakeUp); } private void enqueueChain(@NonNull Chain chain) { @@ -294,20 +340,46 @@ public class JobManager implements ConstraintObserver.Notifier { } } - executor.execute(() -> { + runOnExecutor(() -> { jobController.submitNewJobChain(chain.getJobListChain()); - wakeUp(); + jobController.wakeUp(); }); } private void onEmptyQueue() { - executor.execute(() -> { - for (EmptyQueueListener listener : emptyQueueListeners) { - listener.onQueueEmpty(); + runOnExecutor(() -> { + synchronized (emptyQueueListeners) { + for (EmptyQueueListener listener : emptyQueueListeners) { + listener.onQueueEmpty(); + } } }); } + /** + * Anything that you want to ensure happens off of the main thread and after initialization, run + * it through here. + */ + private void runOnExecutor(@NonNull Runnable runnable) { + executor.execute(() -> { + waitUntilInitialized(); + runnable.run(); + }); + } + + private void waitUntilInitialized() { + if (!initialized) { + Log.i(TAG, "Waiting for initialization..."); + synchronized (this) { + while (!initialized) { + Util.wait(this, 0); + } + } + Log.i(TAG, "Initialization complete."); + } + } + + public interface EmptyQueueListener { void onQueueEmpty(); } diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/impl/WebsocketDrainedConstraintObserver.java b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/impl/WebsocketDrainedConstraintObserver.java index 442f15e10a..7df093f2ea 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/impl/WebsocketDrainedConstraintObserver.java +++ b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/impl/WebsocketDrainedConstraintObserver.java @@ -13,10 +13,18 @@ public class WebsocketDrainedConstraintObserver implements ConstraintObserver { private static final String REASON = WebsocketDrainedConstraintObserver.class.getSimpleName(); - @Override - public void register(@NonNull Notifier notifier) { + private volatile Notifier notifier; + + public WebsocketDrainedConstraintObserver() { ApplicationDependencies.getInitialMessageRetriever().addListener(() -> { - notifier.onConstraintMet(REASON); + if (notifier != null) { + notifier.onConstraintMet(REASON); + } }); } + + @Override + public void register(@NonNull Notifier notifier) { + this.notifier = notifier; + } } diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/persistence/JobStorage.java b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/persistence/JobStorage.java index ff178a796f..3ee3fc9820 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/persistence/JobStorage.java +++ b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/persistence/JobStorage.java @@ -11,6 +11,9 @@ public interface JobStorage { @WorkerThread void init(); + @WorkerThread + void flush(); + @WorkerThread void insertJobs(@NonNull List fullSpecs); diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/FastJobStorage.java b/app/src/main/java/org/thoughtcrime/securesms/jobs/FastJobStorage.java index ccf0df5309..2c1210343b 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/FastJobStorage.java +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/FastJobStorage.java @@ -13,6 +13,7 @@ import org.thoughtcrime.securesms.jobmanager.persistence.DependencySpec; import org.thoughtcrime.securesms.jobmanager.persistence.FullSpec; import org.thoughtcrime.securesms.jobmanager.persistence.JobSpec; import org.thoughtcrime.securesms.jobmanager.persistence.JobStorage; +import org.thoughtcrime.securesms.logging.Log; import org.thoughtcrime.securesms.util.Util; import org.whispersystems.libsignal.util.guava.Optional; @@ -26,17 +27,23 @@ import java.util.List; import java.util.ListIterator; import java.util.Map; import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; public class FastJobStorage implements JobStorage { + private static final String TAG = Log.tag(FastJobStorage.class); + private final JobDatabase jobDatabase; + private final Executor serialExecutor; private final List jobs; private final Map> constraintsByJobId; private final Map> dependenciesByJobId; - public FastJobStorage(@NonNull JobDatabase jobDatabase) { + public FastJobStorage(@NonNull JobDatabase jobDatabase, @NonNull Executor serialExecutor) { this.jobDatabase = jobDatabase; + this.serialExecutor = serialExecutor; this.jobs = new ArrayList<>(); this.constraintsByJobId = new HashMap<>(); this.dependenciesByJobId = new HashMap<>(); @@ -63,9 +70,24 @@ public class FastJobStorage implements JobStorage { } } + @Override + public synchronized void flush() { + CountDownLatch latch = new CountDownLatch(1); + + serialExecutor.execute(latch::countDown); + + try { + latch.await(); + } catch (InterruptedException e) { + Log.w(TAG, "Interrupted while waiting to flush!", e); + } + } + @Override public synchronized void insertJobs(@NonNull List fullSpecs) { - jobDatabase.insertJobs(fullSpecs); + serialExecutor.execute(() -> { + jobDatabase.insertJobs(fullSpecs); + }); for (FullSpec fullSpec : fullSpecs) { jobs.add(fullSpec.getJobSpec()); @@ -146,7 +168,9 @@ public class FastJobStorage implements JobStorage { @Override public synchronized void updateJobRunningState(@NonNull String id, boolean isRunning) { - jobDatabase.updateJobRunningState(id, isRunning); + serialExecutor.execute(() -> { + jobDatabase.updateJobRunningState(id, isRunning); + }); ListIterator iter = jobs.listIterator(); @@ -173,7 +197,9 @@ public class FastJobStorage implements JobStorage { @Override public synchronized void updateJobAfterRetry(@NonNull String id, boolean isRunning, int runAttempt, long nextRunAttemptTime, @NonNull String serializedData) { - jobDatabase.updateJobAfterRetry(id, isRunning, runAttempt, nextRunAttemptTime, serializedData); + serialExecutor.execute(() -> { + jobDatabase.updateJobAfterRetry(id, isRunning, runAttempt, nextRunAttemptTime, serializedData); + }); ListIterator iter = jobs.listIterator(); @@ -200,8 +226,9 @@ public class FastJobStorage implements JobStorage { @Override public synchronized void updateAllJobsToBePending() { - jobDatabase.updateAllJobsToBePending(); - + serialExecutor.execute(() -> { + jobDatabase.updateAllJobsToBePending(); + }); ListIterator iter = jobs.listIterator(); while (iter.hasNext()) { @@ -225,7 +252,9 @@ public class FastJobStorage implements JobStorage { @Override public void updateJobs(@NonNull List jobSpecs) { - jobDatabase.updateJobs(jobSpecs); + serialExecutor.execute(() -> { + jobDatabase.updateJobs(jobSpecs); + }); Map updates = Stream.of(jobSpecs).collect(Collectors.toMap(JobSpec::getId)); ListIterator iter = jobs.listIterator(); @@ -247,7 +276,10 @@ public class FastJobStorage implements JobStorage { @Override public synchronized void deleteJobs(@NonNull List jobIds) { - jobDatabase.deleteJobs(jobIds); + serialExecutor.execute(() -> { + jobDatabase.deleteJobs(jobIds); + }); + Set deleteIds = new HashSet<>(jobIds); diff --git a/app/src/main/java/org/thoughtcrime/securesms/logging/SignalUncaughtExceptionHandler.java b/app/src/main/java/org/thoughtcrime/securesms/logging/SignalUncaughtExceptionHandler.java index c17d491494..d190c1482f 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/logging/SignalUncaughtExceptionHandler.java +++ b/app/src/main/java/org/thoughtcrime/securesms/logging/SignalUncaughtExceptionHandler.java @@ -2,6 +2,7 @@ package org.thoughtcrime.securesms.logging; import androidx.annotation.NonNull; +import org.thoughtcrime.securesms.dependencies.ApplicationDependencies; import org.thoughtcrime.securesms.keyvalue.SignalStore; public class SignalUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler { @@ -19,6 +20,7 @@ public class SignalUncaughtExceptionHandler implements Thread.UncaughtExceptionH Log.e(TAG, "", e); SignalStore.blockUntilAllWritesFinished(); Log.blockUntilAllWritesFinished(); + ApplicationDependencies.getJobManager().flush(); originalHandler.uncaughtException(t, e); } } diff --git a/app/src/main/java/org/thoughtcrime/securesms/util/concurrent/NonMainThreadExecutor.java b/app/src/main/java/org/thoughtcrime/securesms/util/concurrent/NonMainThreadExecutor.java new file mode 100644 index 0000000000..4c241138d1 --- /dev/null +++ b/app/src/main/java/org/thoughtcrime/securesms/util/concurrent/NonMainThreadExecutor.java @@ -0,0 +1,29 @@ +package org.thoughtcrime.securesms.util.concurrent; + +import androidx.annotation.NonNull; + +import org.thoughtcrime.securesms.util.Util; + +import java.util.concurrent.Executor; + +/** + * If submitted on the main thread, the task will be executed on the provided background executor. + * Otherwise, the task will be run on the calling thread. + */ +public final class NonMainThreadExecutor implements Executor { + + private final Executor backgroundExecutor; + + public NonMainThreadExecutor(@NonNull Executor backgroundExecutor) { + this.backgroundExecutor = backgroundExecutor; + } + + @Override + public void execute(@NonNull Runnable runnable) { + if (Util.isMainThread()) { + backgroundExecutor.execute(runnable); + } else { + runnable.run(); + } + } +} diff --git a/app/src/test/java/org/thoughtcrime/securesms/jobs/FastJobStorageTest.java b/app/src/test/java/org/thoughtcrime/securesms/jobs/FastJobStorageTest.java index 458bce9c60..12d9d5e931 100644 --- a/app/src/test/java/org/thoughtcrime/securesms/jobs/FastJobStorageTest.java +++ b/app/src/test/java/org/thoughtcrime/securesms/jobs/FastJobStorageTest.java @@ -13,6 +13,7 @@ import org.thoughtcrime.securesms.jobmanager.persistence.ConstraintSpec; import org.thoughtcrime.securesms.jobmanager.persistence.DependencySpec; import org.thoughtcrime.securesms.jobmanager.persistence.FullSpec; import org.thoughtcrime.securesms.jobmanager.persistence.JobSpec; +import org.thoughtcrime.securesms.testutil.DirectExecutor; import java.util.Arrays; import java.util.Collections; @@ -33,7 +34,7 @@ public class FastJobStorageTest { @Test public void init_allStoredDataAvailable() { - FastJobStorage subject = new FastJobStorage(fixedDataDatabase(DataSet1.FULL_SPECS)); + FastJobStorage subject = new FastJobStorage(fixedDataDatabase(DataSet1.FULL_SPECS), new DirectExecutor()); subject.init(); @@ -45,7 +46,7 @@ public class FastJobStorageTest { @Test public void insertJobs_writesToDatabase() { JobDatabase database = noopDatabase(); - FastJobStorage subject = new FastJobStorage(database); + FastJobStorage subject = new FastJobStorage(database, new DirectExecutor()); subject.insertJobs(DataSet1.FULL_SPECS); @@ -54,7 +55,7 @@ public class FastJobStorageTest { @Test public void insertJobs_dataCanBeFound() { - FastJobStorage subject = new FastJobStorage(noopDatabase()); + FastJobStorage subject = new FastJobStorage(noopDatabase(), new DirectExecutor()); subject.insertJobs(DataSet1.FULL_SPECS); @@ -65,7 +66,7 @@ public class FastJobStorageTest { @Test public void insertJobs_individualJobCanBeFound() { - FastJobStorage subject = new FastJobStorage(noopDatabase()); + FastJobStorage subject = new FastJobStorage(noopDatabase(), new DirectExecutor()); subject.insertJobs(DataSet1.FULL_SPECS); @@ -76,7 +77,7 @@ public class FastJobStorageTest { @Test public void updateAllJobsToBePending_writesToDatabase() { JobDatabase database = noopDatabase(); - FastJobStorage subject = new FastJobStorage(database); + FastJobStorage subject = new FastJobStorage(database, new DirectExecutor()); subject.updateAllJobsToBePending(); @@ -92,7 +93,7 @@ public class FastJobStorageTest { Collections.emptyList(), Collections.emptyList()); - FastJobStorage subject = new FastJobStorage(fixedDataDatabase(Arrays.asList(fullSpec1, fullSpec2))); + FastJobStorage subject = new FastJobStorage(fixedDataDatabase(Arrays.asList(fullSpec1, fullSpec2)), new DirectExecutor()); subject.init(); subject.updateAllJobsToBePending(); @@ -104,7 +105,7 @@ public class FastJobStorageTest { @Test public void updateJobs_writesToDatabase() { JobDatabase database = noopDatabase(); - FastJobStorage subject = new FastJobStorage(database); + FastJobStorage subject = new FastJobStorage(database, new DirectExecutor()); List jobs = Collections.emptyList(); subject.updateJobs(jobs); @@ -125,7 +126,7 @@ public class FastJobStorageTest { Collections.emptyList(), Collections.emptyList()); - FastJobStorage subject = new FastJobStorage(fixedDataDatabase(Arrays.asList(fullSpec1, fullSpec2, fullSpec3))); + FastJobStorage subject = new FastJobStorage(fixedDataDatabase(Arrays.asList(fullSpec1, fullSpec2, fullSpec3)), new DirectExecutor()); JobSpec update1 = new JobSpec("1", "g1", "q1", 2, 2, 2, 2, 2, 2, 2, "abc", null, true); JobSpec update2 = new JobSpec("2", "g2", "q2", 3, 3, 3, 3, 3, 3, 3, "def", "ghi", true); @@ -141,7 +142,7 @@ public class FastJobStorageTest { @Test public void updateJobRunningState_writesToDatabase() { JobDatabase database = noopDatabase(); - FastJobStorage subject = new FastJobStorage(database); + FastJobStorage subject = new FastJobStorage(database, new DirectExecutor()); subject.updateJobRunningState("1", true); @@ -150,7 +151,7 @@ public class FastJobStorageTest { @Test public void updateJobRunningState_stateUpdated() { - FastJobStorage subject = new FastJobStorage(fixedDataDatabase(DataSet1.FULL_SPECS)); + FastJobStorage subject = new FastJobStorage(fixedDataDatabase(DataSet1.FULL_SPECS), new DirectExecutor()); subject.init(); subject.updateJobRunningState(DataSet1.JOB_1.getId(), true); @@ -163,7 +164,7 @@ public class FastJobStorageTest { @Test public void updateJobAfterRetry_writesToDatabase() { JobDatabase database = noopDatabase(); - FastJobStorage subject = new FastJobStorage(database); + FastJobStorage subject = new FastJobStorage(database, new DirectExecutor()); subject.updateJobAfterRetry("1", true, 1, 10, "a"); @@ -176,7 +177,7 @@ public class FastJobStorageTest { Collections.emptyList(), Collections.emptyList()); - FastJobStorage subject = new FastJobStorage(fixedDataDatabase(Collections.singletonList(fullSpec))); + FastJobStorage subject = new FastJobStorage(fixedDataDatabase(Collections.singletonList(fullSpec)), new DirectExecutor()); subject.init(); subject.updateJobAfterRetry("1", false, 1, 10, "a"); @@ -199,7 +200,7 @@ public class FastJobStorageTest { Collections.emptyList(), Collections.emptyList()); - FastJobStorage subject = new FastJobStorage(fixedDataDatabase(Arrays.asList(fullSpec1, fullSpec2))); + FastJobStorage subject = new FastJobStorage(fixedDataDatabase(Arrays.asList(fullSpec1, fullSpec2)), new DirectExecutor()); subject.init(); assertEquals(0, subject.getPendingJobsWithNoDependenciesInCreatedOrder(1).size()); @@ -211,7 +212,7 @@ public class FastJobStorageTest { Collections.emptyList(), Collections.emptyList()); - FastJobStorage subject = new FastJobStorage(fixedDataDatabase(Collections.singletonList(fullSpec))); + FastJobStorage subject = new FastJobStorage(fixedDataDatabase(Collections.singletonList(fullSpec)), new DirectExecutor()); subject.init(); assertEquals(0, subject.getPendingJobsWithNoDependenciesInCreatedOrder(10).size()); @@ -223,7 +224,7 @@ public class FastJobStorageTest { Collections.emptyList(), Collections.emptyList()); - FastJobStorage subject = new FastJobStorage(fixedDataDatabase(Collections.singletonList(fullSpec))); + FastJobStorage subject = new FastJobStorage(fixedDataDatabase(Collections.singletonList(fullSpec)), new DirectExecutor()); subject.init(); assertEquals(0, subject.getPendingJobsWithNoDependenciesInCreatedOrder(0).size()); @@ -239,7 +240,7 @@ public class FastJobStorageTest { Collections.singletonList(new DependencySpec("2", "1"))); - FastJobStorage subject = new FastJobStorage(fixedDataDatabase(Arrays.asList(fullSpec1, fullSpec2))); + FastJobStorage subject = new FastJobStorage(fixedDataDatabase(Arrays.asList(fullSpec1, fullSpec2)), new DirectExecutor()); subject.init(); assertEquals(0, subject.getPendingJobsWithNoDependenciesInCreatedOrder(0).size()); @@ -251,7 +252,7 @@ public class FastJobStorageTest { Collections.emptyList(), Collections.emptyList()); - FastJobStorage subject = new FastJobStorage(fixedDataDatabase(Collections.singletonList(fullSpec))); + FastJobStorage subject = new FastJobStorage(fixedDataDatabase(Collections.singletonList(fullSpec)), new DirectExecutor()); subject.init(); assertEquals(1, subject.getPendingJobsWithNoDependenciesInCreatedOrder(10).size()); @@ -267,7 +268,7 @@ public class FastJobStorageTest { Collections.emptyList()); - FastJobStorage subject = new FastJobStorage(fixedDataDatabase(Arrays.asList(fullSpec1, fullSpec2))); + FastJobStorage subject = new FastJobStorage(fixedDataDatabase(Arrays.asList(fullSpec1, fullSpec2)), new DirectExecutor()); subject.init(); assertEquals(2, subject.getPendingJobsWithNoDependenciesInCreatedOrder(10).size()); @@ -283,7 +284,7 @@ public class FastJobStorageTest { Collections.emptyList()); - FastJobStorage subject = new FastJobStorage(fixedDataDatabase(Arrays.asList(fullSpec1, fullSpec2))); + FastJobStorage subject = new FastJobStorage(fixedDataDatabase(Arrays.asList(fullSpec1, fullSpec2)), new DirectExecutor()); subject.init(); List jobs = subject.getPendingJobsWithNoDependenciesInCreatedOrder(10); @@ -302,7 +303,7 @@ public class FastJobStorageTest { Collections.emptyList()); - FastJobStorage subject = new FastJobStorage(fixedDataDatabase(Arrays.asList(fullSpec1, fullSpec2))); + FastJobStorage subject = new FastJobStorage(fixedDataDatabase(Arrays.asList(fullSpec1, fullSpec2)), new DirectExecutor()); subject.init(); List jobs = subject.getPendingJobsWithNoDependenciesInCreatedOrder(10); @@ -320,7 +321,7 @@ public class FastJobStorageTest { Collections.emptyList(), Collections.emptyList()); - FastJobStorage subject = new FastJobStorage(fixedDataDatabase(Arrays.asList(plainSpec, migrationSpec))); + FastJobStorage subject = new FastJobStorage(fixedDataDatabase(Arrays.asList(plainSpec, migrationSpec)), new DirectExecutor()); subject.init(); List jobs = subject.getPendingJobsWithNoDependenciesInCreatedOrder(10); @@ -338,7 +339,7 @@ public class FastJobStorageTest { Collections.emptyList(), Collections.emptyList()); - FastJobStorage subject = new FastJobStorage(fixedDataDatabase(Arrays.asList(plainSpec, migrationSpec))); + FastJobStorage subject = new FastJobStorage(fixedDataDatabase(Arrays.asList(plainSpec, migrationSpec)), new DirectExecutor()); subject.init(); List jobs = subject.getPendingJobsWithNoDependenciesInCreatedOrder(10); @@ -355,7 +356,7 @@ public class FastJobStorageTest { Collections.emptyList(), Collections.emptyList()); - FastJobStorage subject = new FastJobStorage(fixedDataDatabase(Arrays.asList(migrationSpec1, migrationSpec2))); + FastJobStorage subject = new FastJobStorage(fixedDataDatabase(Arrays.asList(migrationSpec1, migrationSpec2)), new DirectExecutor()); subject.init(); List jobs = subject.getPendingJobsWithNoDependenciesInCreatedOrder(10); @@ -372,7 +373,7 @@ public class FastJobStorageTest { Collections.emptyList(), Collections.emptyList()); - FastJobStorage subject = new FastJobStorage(fixedDataDatabase(Arrays.asList(migrationSpec1, migrationSpec2))); + FastJobStorage subject = new FastJobStorage(fixedDataDatabase(Arrays.asList(migrationSpec1, migrationSpec2)), new DirectExecutor()); subject.init(); List jobs = subject.getPendingJobsWithNoDependenciesInCreatedOrder(10); @@ -390,7 +391,7 @@ public class FastJobStorageTest { Collections.emptyList(), Collections.emptyList()); - FastJobStorage subject = new FastJobStorage(fixedDataDatabase(Arrays.asList(migrationSpec1, migrationSpec2))); + FastJobStorage subject = new FastJobStorage(fixedDataDatabase(Arrays.asList(migrationSpec1, migrationSpec2)), new DirectExecutor()); subject.init(); List jobs = subject.getPendingJobsWithNoDependenciesInCreatedOrder(10); @@ -401,7 +402,7 @@ public class FastJobStorageTest { @Test public void deleteJobs_writesToDatabase() { JobDatabase database = noopDatabase(); - FastJobStorage subject = new FastJobStorage(database); + FastJobStorage subject = new FastJobStorage(database, new DirectExecutor()); List ids = Arrays.asList("1", "2"); subject.deleteJobs(ids); @@ -411,7 +412,7 @@ public class FastJobStorageTest { @Test public void deleteJobs_deletesAllRelevantPieces() { - FastJobStorage subject = new FastJobStorage(fixedDataDatabase(DataSet1.FULL_SPECS)); + FastJobStorage subject = new FastJobStorage(fixedDataDatabase(DataSet1.FULL_SPECS), new DirectExecutor()); subject.init(); subject.deleteJobs(Collections.singletonList("id1")); @@ -430,7 +431,7 @@ public class FastJobStorageTest { @Test public void getDependencySpecsThatDependOnJob_startOfChain() { - FastJobStorage subject = new FastJobStorage(fixedDataDatabase(DataSet1.FULL_SPECS)); + FastJobStorage subject = new FastJobStorage(fixedDataDatabase(DataSet1.FULL_SPECS), new DirectExecutor()); subject.init(); @@ -443,7 +444,7 @@ public class FastJobStorageTest { @Test public void getDependencySpecsThatDependOnJob_midChain() { - FastJobStorage subject = new FastJobStorage(fixedDataDatabase(DataSet1.FULL_SPECS)); + FastJobStorage subject = new FastJobStorage(fixedDataDatabase(DataSet1.FULL_SPECS), new DirectExecutor()); subject.init(); @@ -455,7 +456,7 @@ public class FastJobStorageTest { @Test public void getDependencySpecsThatDependOnJob_endOfChain() { - FastJobStorage subject = new FastJobStorage(fixedDataDatabase(DataSet1.FULL_SPECS)); + FastJobStorage subject = new FastJobStorage(fixedDataDatabase(DataSet1.FULL_SPECS), new DirectExecutor()); subject.init(); @@ -466,7 +467,7 @@ public class FastJobStorageTest { @Test public void getJobsInQueue_empty() { - FastJobStorage subject = new FastJobStorage(fixedDataDatabase(DataSet1.FULL_SPECS)); + FastJobStorage subject = new FastJobStorage(fixedDataDatabase(DataSet1.FULL_SPECS), new DirectExecutor()); subject.init(); @@ -477,7 +478,7 @@ public class FastJobStorageTest { @Test public void getJobsInQueue_singleJob() { - FastJobStorage subject = new FastJobStorage(fixedDataDatabase(DataSet1.FULL_SPECS)); + FastJobStorage subject = new FastJobStorage(fixedDataDatabase(DataSet1.FULL_SPECS), new DirectExecutor()); subject.init();