Various JobManager performance improvements.

This commit is contained in:
Greyson Parrelli 2020-06-22 18:34:31 -07:00
parent bf919207ed
commit f8a0988e5f
9 changed files with 242 additions and 90 deletions

View File

@ -134,7 +134,7 @@ public class ApplicationDependencyProvider implements ApplicationDependencies.Pr
.setJobFactories(JobManagerFactories.getJobFactories(context)) .setJobFactories(JobManagerFactories.getJobFactories(context))
.setConstraintFactories(JobManagerFactories.getConstraintFactories(context)) .setConstraintFactories(JobManagerFactories.getConstraintFactories(context))
.setConstraintObservers(JobManagerFactories.getConstraintObservers(context)) .setConstraintObservers(JobManagerFactories.getConstraintObservers(context))
.setJobStorage(new FastJobStorage(DatabaseFactory.getJobDatabase(context))) .setJobStorage(new FastJobStorage(DatabaseFactory.getJobDatabase(context), SignalExecutors.newCachedSingleThreadExecutor("signal-fast-job-storage")))
.setJobMigrator(new JobMigrator(TextSecurePreferences.getJobManagerVersion(context), JobManager.CURRENT_VERSION, JobManagerFactories.getJobMigrations(context))) .setJobMigrator(new JobMigrator(TextSecurePreferences.getJobManagerVersion(context), JobManager.CURRENT_VERSION, JobManagerFactories.getJobMigrations(context)))
.build()); .build());
} }

View File

@ -77,6 +77,11 @@ class JobController {
notifyAll(); notifyAll();
} }
@WorkerThread
synchronized void flush() {
jobStorage.flush();
}
@WorkerThread @WorkerThread
synchronized void submitNewJobChain(@NonNull List<List<Job>> chain) { synchronized void submitNewJobChain(@NonNull List<List<Job>> chain) {
chain = Stream.of(chain).filterNot(List::isEmpty).toList(); chain = Stream.of(chain).filterNot(List::isEmpty).toList();

View File

@ -3,6 +3,8 @@ package org.thoughtcrime.securesms.jobmanager;
import android.app.Application; import android.app.Application;
import android.content.Intent; import android.content.Intent;
import android.os.Build; import android.os.Build;
import androidx.annotation.GuardedBy;
import androidx.annotation.NonNull; import androidx.annotation.NonNull;
import androidx.annotation.Nullable; import androidx.annotation.Nullable;
import androidx.annotation.WorkerThread; import androidx.annotation.WorkerThread;
@ -14,6 +16,8 @@ import org.thoughtcrime.securesms.jobmanager.persistence.JobStorage;
import org.thoughtcrime.securesms.logging.Log; import org.thoughtcrime.securesms.logging.Log;
import org.thoughtcrime.securesms.util.Debouncer; import org.thoughtcrime.securesms.util.Debouncer;
import org.thoughtcrime.securesms.util.TextSecurePreferences; import org.thoughtcrime.securesms.util.TextSecurePreferences;
import org.thoughtcrime.securesms.util.Util;
import org.thoughtcrime.securesms.util.concurrent.NonMainThreadExecutor;
import org.whispersystems.libsignal.util.guava.Optional; import org.whispersystems.libsignal.util.guava.Optional;
import java.util.ArrayList; import java.util.ArrayList;
@ -26,9 +30,7 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
@ -41,18 +43,21 @@ public class JobManager implements ConstraintObserver.Notifier {
public static final int CURRENT_VERSION = 7; public static final int CURRENT_VERSION = 7;
private final Application application; private final Application application;
private final Configuration configuration; private final Configuration configuration;
private final ExecutorService executor; private final Executor executor;
private final JobController jobController; private final JobController jobController;
private final JobTracker jobTracker; private final JobTracker jobTracker;
@GuardedBy("emptyQueueListeners")
private final Set<EmptyQueueListener> emptyQueueListeners = new CopyOnWriteArraySet<>(); private final Set<EmptyQueueListener> emptyQueueListeners = new CopyOnWriteArraySet<>();
private volatile boolean initialized;
public JobManager(@NonNull Application application, @NonNull Configuration configuration) { public JobManager(@NonNull Application application, @NonNull Configuration configuration) {
this.application = application; this.application = application;
this.configuration = configuration; this.configuration = configuration;
this.executor = configuration.getExecutorFactory().newSingleThreadExecutor("signal-JobManager"); this.executor = new NonMainThreadExecutor(configuration.getExecutorFactory().newSingleThreadExecutor("signal-JobManager"));
this.jobTracker = configuration.getJobTracker(); this.jobTracker = configuration.getJobTracker();
this.jobController = new JobController(application, this.jobController = new JobController(application,
configuration.getJobStorage(), configuration.getJobStorage(),
@ -66,25 +71,30 @@ public class JobManager implements ConstraintObserver.Notifier {
this::onEmptyQueue); this::onEmptyQueue);
executor.execute(() -> { executor.execute(() -> {
if (WorkManagerMigrator.needsMigration(application)) { synchronized (this) {
Log.i(TAG, "Detected an old WorkManager database. Migrating."); if (WorkManagerMigrator.needsMigration(application)) {
WorkManagerMigrator.migrate(application, configuration.getJobStorage(), configuration.getDataSerializer()); Log.i(TAG, "Detected an old WorkManager database. Migrating.");
} WorkManagerMigrator.migrate(application, configuration.getJobStorage(), configuration.getDataSerializer());
}
JobStorage jobStorage = configuration.getJobStorage(); JobStorage jobStorage = configuration.getJobStorage();
jobStorage.init(); jobStorage.init();
int latestVersion = configuration.getJobMigrator().migrate(jobStorage, configuration.getDataSerializer()); int latestVersion = configuration.getJobMigrator().migrate(jobStorage, configuration.getDataSerializer());
TextSecurePreferences.setJobManagerVersion(application, latestVersion); TextSecurePreferences.setJobManagerVersion(application, latestVersion);
jobController.init(); jobController.init();
for (ConstraintObserver constraintObserver : configuration.getConstraintObservers()) { for (ConstraintObserver constraintObserver : configuration.getConstraintObservers()) {
constraintObserver.register(this); constraintObserver.register(this);
} }
if (Build.VERSION.SDK_INT < 26) { if (Build.VERSION.SDK_INT < 26) {
application.startService(new Intent(application, KeepAliveService.class)); application.startService(new Intent(application, KeepAliveService.class));
}
initialized = true;
notifyAll();
} }
}); });
} }
@ -93,11 +103,11 @@ public class JobManager implements ConstraintObserver.Notifier {
* Begins the execution of jobs. * Begins the execution of jobs.
*/ */
public void beginJobLoop() { public void beginJobLoop() {
executor.execute(() -> { runOnExecutor(()-> {
for (int i = 0; i < configuration.getJobThreadCount(); i++) { for (int i = 0; i < configuration.getJobThreadCount(); i++) {
new JobRunner(application, i + 1, jobController).start(); new JobRunner(application, i + 1, jobController).start();
} }
wakeUp(); jobController.wakeUp();
}); });
} }
@ -138,9 +148,9 @@ public class JobManager implements ConstraintObserver.Notifier {
public void add(@NonNull Job job, @NonNull Collection<String> dependsOn) { public void add(@NonNull Job job, @NonNull Collection<String> dependsOn) {
jobTracker.onStateChange(job, JobTracker.JobState.PENDING); jobTracker.onStateChange(job, JobTracker.JobState.PENDING);
executor.execute(() -> { runOnExecutor(() -> {
jobController.submitJobWithExistingDependencies(job, dependsOn, null); jobController.submitJobWithExistingDependencies(job, dependsOn, null);
wakeUp(); jobController.wakeUp();
}); });
} }
@ -151,9 +161,9 @@ public class JobManager implements ConstraintObserver.Notifier {
public void add(@NonNull Job job, @Nullable String dependsOnQueue) { public void add(@NonNull Job job, @Nullable String dependsOnQueue) {
jobTracker.onStateChange(job, JobTracker.JobState.PENDING); jobTracker.onStateChange(job, JobTracker.JobState.PENDING);
executor.execute(() -> { runOnExecutor(() -> {
jobController.submitJobWithExistingDependencies(job, Collections.emptyList(), dependsOnQueue); jobController.submitJobWithExistingDependencies(job, Collections.emptyList(), dependsOnQueue);
wakeUp(); jobController.wakeUp();
}); });
} }
@ -164,9 +174,9 @@ public class JobManager implements ConstraintObserver.Notifier {
public void add(@NonNull Job job, @NonNull Collection<String> dependsOn, @Nullable String dependsOnQueue) { public void add(@NonNull Job job, @NonNull Collection<String> dependsOn, @Nullable String dependsOnQueue) {
jobTracker.onStateChange(job, JobTracker.JobState.PENDING); jobTracker.onStateChange(job, JobTracker.JobState.PENDING);
executor.execute(() -> { runOnExecutor(() -> {
jobController.submitJobWithExistingDependencies(job, dependsOn, dependsOnQueue); jobController.submitJobWithExistingDependencies(job, dependsOn, dependsOnQueue);
wakeUp(); jobController.wakeUp();
}); });
} }
@ -195,14 +205,14 @@ public class JobManager implements ConstraintObserver.Notifier {
* moment. Just like a normal failure, all later jobs in the same chain will also be failed. * moment. Just like a normal failure, all later jobs in the same chain will also be failed.
*/ */
public void cancel(@NonNull String id) { public void cancel(@NonNull String id) {
executor.execute(() -> jobController.cancelJob(id)); runOnExecutor(() -> jobController.cancelJob(id));
} }
/** /**
* Cancels all jobs in the specified queue. See {@link #cancel(String)} for details. * Cancels all jobs in the specified queue. See {@link #cancel(String)} for details.
*/ */
public void cancelAllInQueue(@NonNull String queue) { public void cancelAllInQueue(@NonNull String queue) {
executor.execute(() -> jobController.cancelAllInQueue(queue)); runOnExecutor(() -> jobController.cancelAllInQueue(queue));
} }
/** /**
@ -247,10 +257,22 @@ public class JobManager implements ConstraintObserver.Notifier {
*/ */
@WorkerThread @WorkerThread
public @NonNull String getDebugInfo() { public @NonNull String getDebugInfo() {
Future<String> result = executor.submit(jobController::getDebugInfo); AtomicReference<String> result = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1);
runOnExecutor(() -> {
result.set(jobController.getDebugInfo());
latch.countDown();
});
try { try {
return result.get(); boolean finished = latch.await(10, TimeUnit.SECONDS);
} catch (ExecutionException | InterruptedException e) { if (finished) {
return result.get();
} else {
return "Timed out waiting for Job info.";
}
} catch (InterruptedException e) {
Log.w(TAG, "Failed to retrieve Job info.", e); Log.w(TAG, "Failed to retrieve Job info.", e);
return "Failed to retrieve Job info."; return "Failed to retrieve Job info.";
} }
@ -260,8 +282,10 @@ public class JobManager implements ConstraintObserver.Notifier {
* Adds a listener that will be notified when the job queue has been drained. * Adds a listener that will be notified when the job queue has been drained.
*/ */
void addOnEmptyQueueListener(@NonNull EmptyQueueListener listener) { void addOnEmptyQueueListener(@NonNull EmptyQueueListener listener) {
executor.execute(() -> { runOnExecutor(() -> {
emptyQueueListeners.add(listener); synchronized (emptyQueueListeners) {
emptyQueueListeners.add(listener);
}
}); });
} }
@ -269,8 +293,10 @@ public class JobManager implements ConstraintObserver.Notifier {
* Removes a listener that was added via {@link #addOnEmptyQueueListener(EmptyQueueListener)}. * Removes a listener that was added via {@link #addOnEmptyQueueListener(EmptyQueueListener)}.
*/ */
void removeOnEmptyQueueListener(@NonNull EmptyQueueListener listener) { void removeOnEmptyQueueListener(@NonNull EmptyQueueListener listener) {
executor.execute(() -> { runOnExecutor(() -> {
emptyQueueListeners.remove(listener); synchronized (emptyQueueListeners) {
emptyQueueListeners.remove(listener);
}
}); });
} }
@ -280,11 +306,31 @@ public class JobManager implements ConstraintObserver.Notifier {
wakeUp(); wakeUp();
} }
/**
* Blocks until all pending operations are finished.
*/
@WorkerThread
public void flush() {
CountDownLatch latch = new CountDownLatch(1);
runOnExecutor(() -> {
jobController.flush();
latch.countDown();
});
try {
latch.await();
Log.i(TAG, "Successfully flushed.");
} catch (InterruptedException e) {
Log.w(TAG, "Failed to finish flushing.", e);
}
}
/** /**
* Pokes the system to take another pass at the job queue. * Pokes the system to take another pass at the job queue.
*/ */
void wakeUp() { void wakeUp() {
executor.execute(jobController::wakeUp); runOnExecutor(jobController::wakeUp);
} }
private void enqueueChain(@NonNull Chain chain) { private void enqueueChain(@NonNull Chain chain) {
@ -294,20 +340,46 @@ public class JobManager implements ConstraintObserver.Notifier {
} }
} }
executor.execute(() -> { runOnExecutor(() -> {
jobController.submitNewJobChain(chain.getJobListChain()); jobController.submitNewJobChain(chain.getJobListChain());
wakeUp(); jobController.wakeUp();
}); });
} }
private void onEmptyQueue() { private void onEmptyQueue() {
executor.execute(() -> { runOnExecutor(() -> {
for (EmptyQueueListener listener : emptyQueueListeners) { synchronized (emptyQueueListeners) {
listener.onQueueEmpty(); for (EmptyQueueListener listener : emptyQueueListeners) {
listener.onQueueEmpty();
}
} }
}); });
} }
/**
* Anything that you want to ensure happens off of the main thread and after initialization, run
* it through here.
*/
private void runOnExecutor(@NonNull Runnable runnable) {
executor.execute(() -> {
waitUntilInitialized();
runnable.run();
});
}
private void waitUntilInitialized() {
if (!initialized) {
Log.i(TAG, "Waiting for initialization...");
synchronized (this) {
while (!initialized) {
Util.wait(this, 0);
}
}
Log.i(TAG, "Initialization complete.");
}
}
public interface EmptyQueueListener { public interface EmptyQueueListener {
void onQueueEmpty(); void onQueueEmpty();
} }

View File

@ -13,10 +13,18 @@ public class WebsocketDrainedConstraintObserver implements ConstraintObserver {
private static final String REASON = WebsocketDrainedConstraintObserver.class.getSimpleName(); private static final String REASON = WebsocketDrainedConstraintObserver.class.getSimpleName();
@Override private volatile Notifier notifier;
public void register(@NonNull Notifier notifier) {
public WebsocketDrainedConstraintObserver() {
ApplicationDependencies.getInitialMessageRetriever().addListener(() -> { ApplicationDependencies.getInitialMessageRetriever().addListener(() -> {
notifier.onConstraintMet(REASON); if (notifier != null) {
notifier.onConstraintMet(REASON);
}
}); });
} }
@Override
public void register(@NonNull Notifier notifier) {
this.notifier = notifier;
}
} }

View File

@ -11,6 +11,9 @@ public interface JobStorage {
@WorkerThread @WorkerThread
void init(); void init();
@WorkerThread
void flush();
@WorkerThread @WorkerThread
void insertJobs(@NonNull List<FullSpec> fullSpecs); void insertJobs(@NonNull List<FullSpec> fullSpecs);

View File

@ -13,6 +13,7 @@ import org.thoughtcrime.securesms.jobmanager.persistence.DependencySpec;
import org.thoughtcrime.securesms.jobmanager.persistence.FullSpec; import org.thoughtcrime.securesms.jobmanager.persistence.FullSpec;
import org.thoughtcrime.securesms.jobmanager.persistence.JobSpec; import org.thoughtcrime.securesms.jobmanager.persistence.JobSpec;
import org.thoughtcrime.securesms.jobmanager.persistence.JobStorage; import org.thoughtcrime.securesms.jobmanager.persistence.JobStorage;
import org.thoughtcrime.securesms.logging.Log;
import org.thoughtcrime.securesms.util.Util; import org.thoughtcrime.securesms.util.Util;
import org.whispersystems.libsignal.util.guava.Optional; import org.whispersystems.libsignal.util.guava.Optional;
@ -26,17 +27,23 @@ import java.util.List;
import java.util.ListIterator; import java.util.ListIterator;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
public class FastJobStorage implements JobStorage { public class FastJobStorage implements JobStorage {
private static final String TAG = Log.tag(FastJobStorage.class);
private final JobDatabase jobDatabase; private final JobDatabase jobDatabase;
private final Executor serialExecutor;
private final List<JobSpec> jobs; private final List<JobSpec> jobs;
private final Map<String, List<ConstraintSpec>> constraintsByJobId; private final Map<String, List<ConstraintSpec>> constraintsByJobId;
private final Map<String, List<DependencySpec>> dependenciesByJobId; private final Map<String, List<DependencySpec>> dependenciesByJobId;
public FastJobStorage(@NonNull JobDatabase jobDatabase) { public FastJobStorage(@NonNull JobDatabase jobDatabase, @NonNull Executor serialExecutor) {
this.jobDatabase = jobDatabase; this.jobDatabase = jobDatabase;
this.serialExecutor = serialExecutor;
this.jobs = new ArrayList<>(); this.jobs = new ArrayList<>();
this.constraintsByJobId = new HashMap<>(); this.constraintsByJobId = new HashMap<>();
this.dependenciesByJobId = new HashMap<>(); this.dependenciesByJobId = new HashMap<>();
@ -63,9 +70,24 @@ public class FastJobStorage implements JobStorage {
} }
} }
@Override
public synchronized void flush() {
CountDownLatch latch = new CountDownLatch(1);
serialExecutor.execute(latch::countDown);
try {
latch.await();
} catch (InterruptedException e) {
Log.w(TAG, "Interrupted while waiting to flush!", e);
}
}
@Override @Override
public synchronized void insertJobs(@NonNull List<FullSpec> fullSpecs) { public synchronized void insertJobs(@NonNull List<FullSpec> fullSpecs) {
jobDatabase.insertJobs(fullSpecs); serialExecutor.execute(() -> {
jobDatabase.insertJobs(fullSpecs);
});
for (FullSpec fullSpec : fullSpecs) { for (FullSpec fullSpec : fullSpecs) {
jobs.add(fullSpec.getJobSpec()); jobs.add(fullSpec.getJobSpec());
@ -146,7 +168,9 @@ public class FastJobStorage implements JobStorage {
@Override @Override
public synchronized void updateJobRunningState(@NonNull String id, boolean isRunning) { public synchronized void updateJobRunningState(@NonNull String id, boolean isRunning) {
jobDatabase.updateJobRunningState(id, isRunning); serialExecutor.execute(() -> {
jobDatabase.updateJobRunningState(id, isRunning);
});
ListIterator<JobSpec> iter = jobs.listIterator(); ListIterator<JobSpec> iter = jobs.listIterator();
@ -173,7 +197,9 @@ public class FastJobStorage implements JobStorage {
@Override @Override
public synchronized void updateJobAfterRetry(@NonNull String id, boolean isRunning, int runAttempt, long nextRunAttemptTime, @NonNull String serializedData) { public synchronized void updateJobAfterRetry(@NonNull String id, boolean isRunning, int runAttempt, long nextRunAttemptTime, @NonNull String serializedData) {
jobDatabase.updateJobAfterRetry(id, isRunning, runAttempt, nextRunAttemptTime, serializedData); serialExecutor.execute(() -> {
jobDatabase.updateJobAfterRetry(id, isRunning, runAttempt, nextRunAttemptTime, serializedData);
});
ListIterator<JobSpec> iter = jobs.listIterator(); ListIterator<JobSpec> iter = jobs.listIterator();
@ -200,8 +226,9 @@ public class FastJobStorage implements JobStorage {
@Override @Override
public synchronized void updateAllJobsToBePending() { public synchronized void updateAllJobsToBePending() {
jobDatabase.updateAllJobsToBePending(); serialExecutor.execute(() -> {
jobDatabase.updateAllJobsToBePending();
});
ListIterator<JobSpec> iter = jobs.listIterator(); ListIterator<JobSpec> iter = jobs.listIterator();
while (iter.hasNext()) { while (iter.hasNext()) {
@ -225,7 +252,9 @@ public class FastJobStorage implements JobStorage {
@Override @Override
public void updateJobs(@NonNull List<JobSpec> jobSpecs) { public void updateJobs(@NonNull List<JobSpec> jobSpecs) {
jobDatabase.updateJobs(jobSpecs); serialExecutor.execute(() -> {
jobDatabase.updateJobs(jobSpecs);
});
Map<String, JobSpec> updates = Stream.of(jobSpecs).collect(Collectors.toMap(JobSpec::getId)); Map<String, JobSpec> updates = Stream.of(jobSpecs).collect(Collectors.toMap(JobSpec::getId));
ListIterator<JobSpec> iter = jobs.listIterator(); ListIterator<JobSpec> iter = jobs.listIterator();
@ -247,7 +276,10 @@ public class FastJobStorage implements JobStorage {
@Override @Override
public synchronized void deleteJobs(@NonNull List<String> jobIds) { public synchronized void deleteJobs(@NonNull List<String> jobIds) {
jobDatabase.deleteJobs(jobIds); serialExecutor.execute(() -> {
jobDatabase.deleteJobs(jobIds);
});
Set<String> deleteIds = new HashSet<>(jobIds); Set<String> deleteIds = new HashSet<>(jobIds);

View File

@ -2,6 +2,7 @@ package org.thoughtcrime.securesms.logging;
import androidx.annotation.NonNull; import androidx.annotation.NonNull;
import org.thoughtcrime.securesms.dependencies.ApplicationDependencies;
import org.thoughtcrime.securesms.keyvalue.SignalStore; import org.thoughtcrime.securesms.keyvalue.SignalStore;
public class SignalUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler { public class SignalUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
@ -19,6 +20,7 @@ public class SignalUncaughtExceptionHandler implements Thread.UncaughtExceptionH
Log.e(TAG, "", e); Log.e(TAG, "", e);
SignalStore.blockUntilAllWritesFinished(); SignalStore.blockUntilAllWritesFinished();
Log.blockUntilAllWritesFinished(); Log.blockUntilAllWritesFinished();
ApplicationDependencies.getJobManager().flush();
originalHandler.uncaughtException(t, e); originalHandler.uncaughtException(t, e);
} }
} }

View File

@ -0,0 +1,29 @@
package org.thoughtcrime.securesms.util.concurrent;
import androidx.annotation.NonNull;
import org.thoughtcrime.securesms.util.Util;
import java.util.concurrent.Executor;
/**
* If submitted on the main thread, the task will be executed on the provided background executor.
* Otherwise, the task will be run on the calling thread.
*/
public final class NonMainThreadExecutor implements Executor {
private final Executor backgroundExecutor;
public NonMainThreadExecutor(@NonNull Executor backgroundExecutor) {
this.backgroundExecutor = backgroundExecutor;
}
@Override
public void execute(@NonNull Runnable runnable) {
if (Util.isMainThread()) {
backgroundExecutor.execute(runnable);
} else {
runnable.run();
}
}
}

View File

@ -13,6 +13,7 @@ import org.thoughtcrime.securesms.jobmanager.persistence.ConstraintSpec;
import org.thoughtcrime.securesms.jobmanager.persistence.DependencySpec; import org.thoughtcrime.securesms.jobmanager.persistence.DependencySpec;
import org.thoughtcrime.securesms.jobmanager.persistence.FullSpec; import org.thoughtcrime.securesms.jobmanager.persistence.FullSpec;
import org.thoughtcrime.securesms.jobmanager.persistence.JobSpec; import org.thoughtcrime.securesms.jobmanager.persistence.JobSpec;
import org.thoughtcrime.securesms.testutil.DirectExecutor;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
@ -33,7 +34,7 @@ public class FastJobStorageTest {
@Test @Test
public void init_allStoredDataAvailable() { public void init_allStoredDataAvailable() {
FastJobStorage subject = new FastJobStorage(fixedDataDatabase(DataSet1.FULL_SPECS)); FastJobStorage subject = new FastJobStorage(fixedDataDatabase(DataSet1.FULL_SPECS), new DirectExecutor());
subject.init(); subject.init();
@ -45,7 +46,7 @@ public class FastJobStorageTest {
@Test @Test
public void insertJobs_writesToDatabase() { public void insertJobs_writesToDatabase() {
JobDatabase database = noopDatabase(); JobDatabase database = noopDatabase();
FastJobStorage subject = new FastJobStorage(database); FastJobStorage subject = new FastJobStorage(database, new DirectExecutor());
subject.insertJobs(DataSet1.FULL_SPECS); subject.insertJobs(DataSet1.FULL_SPECS);
@ -54,7 +55,7 @@ public class FastJobStorageTest {
@Test @Test
public void insertJobs_dataCanBeFound() { public void insertJobs_dataCanBeFound() {
FastJobStorage subject = new FastJobStorage(noopDatabase()); FastJobStorage subject = new FastJobStorage(noopDatabase(), new DirectExecutor());
subject.insertJobs(DataSet1.FULL_SPECS); subject.insertJobs(DataSet1.FULL_SPECS);
@ -65,7 +66,7 @@ public class FastJobStorageTest {
@Test @Test
public void insertJobs_individualJobCanBeFound() { public void insertJobs_individualJobCanBeFound() {
FastJobStorage subject = new FastJobStorage(noopDatabase()); FastJobStorage subject = new FastJobStorage(noopDatabase(), new DirectExecutor());
subject.insertJobs(DataSet1.FULL_SPECS); subject.insertJobs(DataSet1.FULL_SPECS);
@ -76,7 +77,7 @@ public class FastJobStorageTest {
@Test @Test
public void updateAllJobsToBePending_writesToDatabase() { public void updateAllJobsToBePending_writesToDatabase() {
JobDatabase database = noopDatabase(); JobDatabase database = noopDatabase();
FastJobStorage subject = new FastJobStorage(database); FastJobStorage subject = new FastJobStorage(database, new DirectExecutor());
subject.updateAllJobsToBePending(); subject.updateAllJobsToBePending();
@ -92,7 +93,7 @@ public class FastJobStorageTest {
Collections.emptyList(), Collections.emptyList(),
Collections.emptyList()); Collections.emptyList());
FastJobStorage subject = new FastJobStorage(fixedDataDatabase(Arrays.asList(fullSpec1, fullSpec2))); FastJobStorage subject = new FastJobStorage(fixedDataDatabase(Arrays.asList(fullSpec1, fullSpec2)), new DirectExecutor());
subject.init(); subject.init();
subject.updateAllJobsToBePending(); subject.updateAllJobsToBePending();
@ -104,7 +105,7 @@ public class FastJobStorageTest {
@Test @Test
public void updateJobs_writesToDatabase() { public void updateJobs_writesToDatabase() {
JobDatabase database = noopDatabase(); JobDatabase database = noopDatabase();
FastJobStorage subject = new FastJobStorage(database); FastJobStorage subject = new FastJobStorage(database, new DirectExecutor());
List<JobSpec> jobs = Collections.emptyList(); List<JobSpec> jobs = Collections.emptyList();
subject.updateJobs(jobs); subject.updateJobs(jobs);
@ -125,7 +126,7 @@ public class FastJobStorageTest {
Collections.emptyList(), Collections.emptyList(),
Collections.emptyList()); Collections.emptyList());
FastJobStorage subject = new FastJobStorage(fixedDataDatabase(Arrays.asList(fullSpec1, fullSpec2, fullSpec3))); FastJobStorage subject = new FastJobStorage(fixedDataDatabase(Arrays.asList(fullSpec1, fullSpec2, fullSpec3)), new DirectExecutor());
JobSpec update1 = new JobSpec("1", "g1", "q1", 2, 2, 2, 2, 2, 2, 2, "abc", null, true); JobSpec update1 = new JobSpec("1", "g1", "q1", 2, 2, 2, 2, 2, 2, 2, "abc", null, true);
JobSpec update2 = new JobSpec("2", "g2", "q2", 3, 3, 3, 3, 3, 3, 3, "def", "ghi", true); JobSpec update2 = new JobSpec("2", "g2", "q2", 3, 3, 3, 3, 3, 3, 3, "def", "ghi", true);
@ -141,7 +142,7 @@ public class FastJobStorageTest {
@Test @Test
public void updateJobRunningState_writesToDatabase() { public void updateJobRunningState_writesToDatabase() {
JobDatabase database = noopDatabase(); JobDatabase database = noopDatabase();
FastJobStorage subject = new FastJobStorage(database); FastJobStorage subject = new FastJobStorage(database, new DirectExecutor());
subject.updateJobRunningState("1", true); subject.updateJobRunningState("1", true);
@ -150,7 +151,7 @@ public class FastJobStorageTest {
@Test @Test
public void updateJobRunningState_stateUpdated() { public void updateJobRunningState_stateUpdated() {
FastJobStorage subject = new FastJobStorage(fixedDataDatabase(DataSet1.FULL_SPECS)); FastJobStorage subject = new FastJobStorage(fixedDataDatabase(DataSet1.FULL_SPECS), new DirectExecutor());
subject.init(); subject.init();
subject.updateJobRunningState(DataSet1.JOB_1.getId(), true); subject.updateJobRunningState(DataSet1.JOB_1.getId(), true);
@ -163,7 +164,7 @@ public class FastJobStorageTest {
@Test @Test
public void updateJobAfterRetry_writesToDatabase() { public void updateJobAfterRetry_writesToDatabase() {
JobDatabase database = noopDatabase(); JobDatabase database = noopDatabase();
FastJobStorage subject = new FastJobStorage(database); FastJobStorage subject = new FastJobStorage(database, new DirectExecutor());
subject.updateJobAfterRetry("1", true, 1, 10, "a"); subject.updateJobAfterRetry("1", true, 1, 10, "a");
@ -176,7 +177,7 @@ public class FastJobStorageTest {
Collections.emptyList(), Collections.emptyList(),
Collections.emptyList()); Collections.emptyList());
FastJobStorage subject = new FastJobStorage(fixedDataDatabase(Collections.singletonList(fullSpec))); FastJobStorage subject = new FastJobStorage(fixedDataDatabase(Collections.singletonList(fullSpec)), new DirectExecutor());
subject.init(); subject.init();
subject.updateJobAfterRetry("1", false, 1, 10, "a"); subject.updateJobAfterRetry("1", false, 1, 10, "a");
@ -199,7 +200,7 @@ public class FastJobStorageTest {
Collections.emptyList(), Collections.emptyList(),
Collections.emptyList()); Collections.emptyList());
FastJobStorage subject = new FastJobStorage(fixedDataDatabase(Arrays.asList(fullSpec1, fullSpec2))); FastJobStorage subject = new FastJobStorage(fixedDataDatabase(Arrays.asList(fullSpec1, fullSpec2)), new DirectExecutor());
subject.init(); subject.init();
assertEquals(0, subject.getPendingJobsWithNoDependenciesInCreatedOrder(1).size()); assertEquals(0, subject.getPendingJobsWithNoDependenciesInCreatedOrder(1).size());
@ -211,7 +212,7 @@ public class FastJobStorageTest {
Collections.emptyList(), Collections.emptyList(),
Collections.emptyList()); Collections.emptyList());
FastJobStorage subject = new FastJobStorage(fixedDataDatabase(Collections.singletonList(fullSpec))); FastJobStorage subject = new FastJobStorage(fixedDataDatabase(Collections.singletonList(fullSpec)), new DirectExecutor());
subject.init(); subject.init();
assertEquals(0, subject.getPendingJobsWithNoDependenciesInCreatedOrder(10).size()); assertEquals(0, subject.getPendingJobsWithNoDependenciesInCreatedOrder(10).size());
@ -223,7 +224,7 @@ public class FastJobStorageTest {
Collections.emptyList(), Collections.emptyList(),
Collections.emptyList()); Collections.emptyList());
FastJobStorage subject = new FastJobStorage(fixedDataDatabase(Collections.singletonList(fullSpec))); FastJobStorage subject = new FastJobStorage(fixedDataDatabase(Collections.singletonList(fullSpec)), new DirectExecutor());
subject.init(); subject.init();
assertEquals(0, subject.getPendingJobsWithNoDependenciesInCreatedOrder(0).size()); assertEquals(0, subject.getPendingJobsWithNoDependenciesInCreatedOrder(0).size());
@ -239,7 +240,7 @@ public class FastJobStorageTest {
Collections.singletonList(new DependencySpec("2", "1"))); Collections.singletonList(new DependencySpec("2", "1")));
FastJobStorage subject = new FastJobStorage(fixedDataDatabase(Arrays.asList(fullSpec1, fullSpec2))); FastJobStorage subject = new FastJobStorage(fixedDataDatabase(Arrays.asList(fullSpec1, fullSpec2)), new DirectExecutor());
subject.init(); subject.init();
assertEquals(0, subject.getPendingJobsWithNoDependenciesInCreatedOrder(0).size()); assertEquals(0, subject.getPendingJobsWithNoDependenciesInCreatedOrder(0).size());
@ -251,7 +252,7 @@ public class FastJobStorageTest {
Collections.emptyList(), Collections.emptyList(),
Collections.emptyList()); Collections.emptyList());
FastJobStorage subject = new FastJobStorage(fixedDataDatabase(Collections.singletonList(fullSpec))); FastJobStorage subject = new FastJobStorage(fixedDataDatabase(Collections.singletonList(fullSpec)), new DirectExecutor());
subject.init(); subject.init();
assertEquals(1, subject.getPendingJobsWithNoDependenciesInCreatedOrder(10).size()); assertEquals(1, subject.getPendingJobsWithNoDependenciesInCreatedOrder(10).size());
@ -267,7 +268,7 @@ public class FastJobStorageTest {
Collections.emptyList()); Collections.emptyList());
FastJobStorage subject = new FastJobStorage(fixedDataDatabase(Arrays.asList(fullSpec1, fullSpec2))); FastJobStorage subject = new FastJobStorage(fixedDataDatabase(Arrays.asList(fullSpec1, fullSpec2)), new DirectExecutor());
subject.init(); subject.init();
assertEquals(2, subject.getPendingJobsWithNoDependenciesInCreatedOrder(10).size()); assertEquals(2, subject.getPendingJobsWithNoDependenciesInCreatedOrder(10).size());
@ -283,7 +284,7 @@ public class FastJobStorageTest {
Collections.emptyList()); Collections.emptyList());
FastJobStorage subject = new FastJobStorage(fixedDataDatabase(Arrays.asList(fullSpec1, fullSpec2))); FastJobStorage subject = new FastJobStorage(fixedDataDatabase(Arrays.asList(fullSpec1, fullSpec2)), new DirectExecutor());
subject.init(); subject.init();
List<JobSpec> jobs = subject.getPendingJobsWithNoDependenciesInCreatedOrder(10); List<JobSpec> jobs = subject.getPendingJobsWithNoDependenciesInCreatedOrder(10);
@ -302,7 +303,7 @@ public class FastJobStorageTest {
Collections.emptyList()); Collections.emptyList());
FastJobStorage subject = new FastJobStorage(fixedDataDatabase(Arrays.asList(fullSpec1, fullSpec2))); FastJobStorage subject = new FastJobStorage(fixedDataDatabase(Arrays.asList(fullSpec1, fullSpec2)), new DirectExecutor());
subject.init(); subject.init();
List<JobSpec> jobs = subject.getPendingJobsWithNoDependenciesInCreatedOrder(10); List<JobSpec> jobs = subject.getPendingJobsWithNoDependenciesInCreatedOrder(10);
@ -320,7 +321,7 @@ public class FastJobStorageTest {
Collections.emptyList(), Collections.emptyList(),
Collections.emptyList()); Collections.emptyList());
FastJobStorage subject = new FastJobStorage(fixedDataDatabase(Arrays.asList(plainSpec, migrationSpec))); FastJobStorage subject = new FastJobStorage(fixedDataDatabase(Arrays.asList(plainSpec, migrationSpec)), new DirectExecutor());
subject.init(); subject.init();
List<JobSpec> jobs = subject.getPendingJobsWithNoDependenciesInCreatedOrder(10); List<JobSpec> jobs = subject.getPendingJobsWithNoDependenciesInCreatedOrder(10);
@ -338,7 +339,7 @@ public class FastJobStorageTest {
Collections.emptyList(), Collections.emptyList(),
Collections.emptyList()); Collections.emptyList());
FastJobStorage subject = new FastJobStorage(fixedDataDatabase(Arrays.asList(plainSpec, migrationSpec))); FastJobStorage subject = new FastJobStorage(fixedDataDatabase(Arrays.asList(plainSpec, migrationSpec)), new DirectExecutor());
subject.init(); subject.init();
List<JobSpec> jobs = subject.getPendingJobsWithNoDependenciesInCreatedOrder(10); List<JobSpec> jobs = subject.getPendingJobsWithNoDependenciesInCreatedOrder(10);
@ -355,7 +356,7 @@ public class FastJobStorageTest {
Collections.emptyList(), Collections.emptyList(),
Collections.emptyList()); Collections.emptyList());
FastJobStorage subject = new FastJobStorage(fixedDataDatabase(Arrays.asList(migrationSpec1, migrationSpec2))); FastJobStorage subject = new FastJobStorage(fixedDataDatabase(Arrays.asList(migrationSpec1, migrationSpec2)), new DirectExecutor());
subject.init(); subject.init();
List<JobSpec> jobs = subject.getPendingJobsWithNoDependenciesInCreatedOrder(10); List<JobSpec> jobs = subject.getPendingJobsWithNoDependenciesInCreatedOrder(10);
@ -372,7 +373,7 @@ public class FastJobStorageTest {
Collections.emptyList(), Collections.emptyList(),
Collections.emptyList()); Collections.emptyList());
FastJobStorage subject = new FastJobStorage(fixedDataDatabase(Arrays.asList(migrationSpec1, migrationSpec2))); FastJobStorage subject = new FastJobStorage(fixedDataDatabase(Arrays.asList(migrationSpec1, migrationSpec2)), new DirectExecutor());
subject.init(); subject.init();
List<JobSpec> jobs = subject.getPendingJobsWithNoDependenciesInCreatedOrder(10); List<JobSpec> jobs = subject.getPendingJobsWithNoDependenciesInCreatedOrder(10);
@ -390,7 +391,7 @@ public class FastJobStorageTest {
Collections.emptyList(), Collections.emptyList(),
Collections.emptyList()); Collections.emptyList());
FastJobStorage subject = new FastJobStorage(fixedDataDatabase(Arrays.asList(migrationSpec1, migrationSpec2))); FastJobStorage subject = new FastJobStorage(fixedDataDatabase(Arrays.asList(migrationSpec1, migrationSpec2)), new DirectExecutor());
subject.init(); subject.init();
List<JobSpec> jobs = subject.getPendingJobsWithNoDependenciesInCreatedOrder(10); List<JobSpec> jobs = subject.getPendingJobsWithNoDependenciesInCreatedOrder(10);
@ -401,7 +402,7 @@ public class FastJobStorageTest {
@Test @Test
public void deleteJobs_writesToDatabase() { public void deleteJobs_writesToDatabase() {
JobDatabase database = noopDatabase(); JobDatabase database = noopDatabase();
FastJobStorage subject = new FastJobStorage(database); FastJobStorage subject = new FastJobStorage(database, new DirectExecutor());
List<String> ids = Arrays.asList("1", "2"); List<String> ids = Arrays.asList("1", "2");
subject.deleteJobs(ids); subject.deleteJobs(ids);
@ -411,7 +412,7 @@ public class FastJobStorageTest {
@Test @Test
public void deleteJobs_deletesAllRelevantPieces() { public void deleteJobs_deletesAllRelevantPieces() {
FastJobStorage subject = new FastJobStorage(fixedDataDatabase(DataSet1.FULL_SPECS)); FastJobStorage subject = new FastJobStorage(fixedDataDatabase(DataSet1.FULL_SPECS), new DirectExecutor());
subject.init(); subject.init();
subject.deleteJobs(Collections.singletonList("id1")); subject.deleteJobs(Collections.singletonList("id1"));
@ -430,7 +431,7 @@ public class FastJobStorageTest {
@Test @Test
public void getDependencySpecsThatDependOnJob_startOfChain() { public void getDependencySpecsThatDependOnJob_startOfChain() {
FastJobStorage subject = new FastJobStorage(fixedDataDatabase(DataSet1.FULL_SPECS)); FastJobStorage subject = new FastJobStorage(fixedDataDatabase(DataSet1.FULL_SPECS), new DirectExecutor());
subject.init(); subject.init();
@ -443,7 +444,7 @@ public class FastJobStorageTest {
@Test @Test
public void getDependencySpecsThatDependOnJob_midChain() { public void getDependencySpecsThatDependOnJob_midChain() {
FastJobStorage subject = new FastJobStorage(fixedDataDatabase(DataSet1.FULL_SPECS)); FastJobStorage subject = new FastJobStorage(fixedDataDatabase(DataSet1.FULL_SPECS), new DirectExecutor());
subject.init(); subject.init();
@ -455,7 +456,7 @@ public class FastJobStorageTest {
@Test @Test
public void getDependencySpecsThatDependOnJob_endOfChain() { public void getDependencySpecsThatDependOnJob_endOfChain() {
FastJobStorage subject = new FastJobStorage(fixedDataDatabase(DataSet1.FULL_SPECS)); FastJobStorage subject = new FastJobStorage(fixedDataDatabase(DataSet1.FULL_SPECS), new DirectExecutor());
subject.init(); subject.init();
@ -466,7 +467,7 @@ public class FastJobStorageTest {
@Test @Test
public void getJobsInQueue_empty() { public void getJobsInQueue_empty() {
FastJobStorage subject = new FastJobStorage(fixedDataDatabase(DataSet1.FULL_SPECS)); FastJobStorage subject = new FastJobStorage(fixedDataDatabase(DataSet1.FULL_SPECS), new DirectExecutor());
subject.init(); subject.init();
@ -477,7 +478,7 @@ public class FastJobStorageTest {
@Test @Test
public void getJobsInQueue_singleJob() { public void getJobsInQueue_singleJob() {
FastJobStorage subject = new FastJobStorage(fixedDataDatabase(DataSet1.FULL_SPECS)); FastJobStorage subject = new FastJobStorage(fixedDataDatabase(DataSet1.FULL_SPECS), new DirectExecutor());
subject.init(); subject.init();