Keep retrying message sends for 24 hours.

Previously, we retried based on a count. Now we've added the ability to
keep retrying for a specified time, using exponential backoff to
throttle attempts.
This commit is contained in:
Greyson Parrelli 2018-06-19 19:22:39 -07:00
parent cddb8082f4
commit a50edc3d25
18 changed files with 228 additions and 44 deletions

View File

@ -528,6 +528,7 @@
<receiver android:name=".service.ExpirationListener" />
<receiver android:name=".jobmanager.requirements.BackoffReceiver" />
<provider android:name=".providers.PartProvider"
android:grantUriPermissions="true"

View File

@ -33,6 +33,7 @@ public abstract class Job implements Serializable {
private transient long persistentId;
private transient int runIteration;
private transient long lastRunTime;
private transient PowerManager.WakeLock wakeLock;
public Job(JobParameters parameters) {
@ -45,7 +46,7 @@ public abstract class Job implements Serializable {
public boolean isRequirementsMet() {
for (Requirement requirement : parameters.getRequirements()) {
if (!requirement.isPresent()) return false;
if (!requirement.isPresent(this)) return false;
}
return true;
@ -71,6 +72,19 @@ public abstract class Job implements Serializable {
return parameters.getRetryCount();
}
public long getRetryUntil() {
return parameters.getRetryUntil();
}
public long getLastRunTime() {
return lastRunTime;
}
public void resetRunStats() {
runIteration = 0;
lastRunTime = 0;
}
public void setPersistentId(long persistentId) {
this.persistentId = persistentId;
}
@ -83,10 +97,6 @@ public abstract class Job implements Serializable {
return runIteration;
}
public void setRunIteration(int runIteration) {
this.runIteration = runIteration;
}
public boolean needsWakeLock() {
return parameters.needsWakeLock();
}
@ -103,6 +113,15 @@ public abstract class Job implements Serializable {
return this.wakeLock;
}
public void onRetry() {
runIteration++;
lastRunTime = System.currentTimeMillis();
for (Requirement requirement : parameters.getRequirements()) {
requirement.onRetry(this);
}
}
/**
* Called after a job has been added to the JobManager queue. If it's a persistent job,
* the state has been persisted to disk before this method is called.
@ -113,7 +132,7 @@ public abstract class Job implements Serializable {
* Called to actually execute the job.
* @throws Exception
*/
public abstract void onRun() throws Exception;
protected abstract void onRun() throws Exception;
/**
* If onRun() throws an exception, this method will be called to determine whether the

View File

@ -16,6 +16,7 @@
*/
package org.thoughtcrime.securesms.jobmanager;
import android.support.annotation.NonNull;
import android.util.Log;
import org.thoughtcrime.securesms.jobmanager.persistence.PersistentStorage;
@ -59,19 +60,16 @@ class JobConsumer extends Thread {
if (job.getWakeLock() != null && job.getWakeLockTimeout() == 0) {
job.getWakeLock().release();
}
}
if (job.getGroupId() != null) {
jobQueue.setGroupIdAvailable(job.getGroupId());
}
}
}
}
private JobResult runJob(Job job) {
int retryCount = job.getRetryCount();
int runIteration = job.getRunIteration();
for (;runIteration<retryCount;runIteration++) {
while (canRetry(job)) {
try {
job.onRun();
return JobResult.SUCCESS;
@ -81,8 +79,10 @@ class JobConsumer extends Thread {
throw (RuntimeException)exception;
} else if (!job.onShouldRetry(exception)) {
return JobResult.FAILURE;
} else if (!job.isRequirementsMet()) {
job.setRunIteration(runIteration+1);
}
job.onRetry();
if (!job.isRequirementsMet()) {
return JobResult.DEFERRED;
}
}
@ -91,4 +91,10 @@ class JobConsumer extends Thread {
return JobResult.FAILURE;
}
private boolean canRetry(@NonNull Job job) {
if (job.getRetryCount() > 0) {
return job.getRunIteration() < job.getRetryCount();
}
return System.currentTimeMillis() < job.getRetryUntil();
}
}

View File

@ -28,11 +28,14 @@ import java.util.concurrent.TimeUnit;
*/
public class JobParameters implements Serializable {
private static final long serialVersionUID = 4880456378402584584L;
private transient EncryptionKeys encryptionKeys;
private final List<Requirement> requirements;
private final boolean isPersistent;
private final int retryCount;
private final long retryUntil;
private final String groupId;
private final boolean wakeLock;
private final long wakeLockTimeout;
@ -40,7 +43,7 @@ public class JobParameters implements Serializable {
private JobParameters(List<Requirement> requirements,
boolean isPersistent, String groupId,
EncryptionKeys encryptionKeys,
int retryCount, boolean wakeLock,
int retryCount, long retryUntil, boolean wakeLock,
long wakeLockTimeout)
{
this.requirements = requirements;
@ -48,6 +51,7 @@ public class JobParameters implements Serializable {
this.groupId = groupId;
this.encryptionKeys = encryptionKeys;
this.retryCount = retryCount;
this.retryUntil = retryUntil;
this.wakeLock = wakeLock;
this.wakeLockTimeout = wakeLockTimeout;
}
@ -72,6 +76,10 @@ public class JobParameters implements Serializable {
return retryCount;
}
public long getRetryUntil() {
return retryUntil;
}
/**
* @return a builder used to construct JobParameters.
*/
@ -96,6 +104,7 @@ public class JobParameters implements Serializable {
private boolean isPersistent = false;
private EncryptionKeys encryptionKeys = null;
private int retryCount = 100;
private long retryDuration = 0;
private String groupId = null;
private boolean wakeLock = false;
private long wakeLockTimeout = 0;
@ -140,6 +149,13 @@ public class JobParameters implements Serializable {
*/
public Builder withRetryCount(int retryCount) {
this.retryCount = retryCount;
this.retryDuration = 0;
return this;
}
public Builder withRetryDuration(long duration) {
this.retryDuration = duration;
this.retryCount = 0;
return this;
}
@ -184,7 +200,7 @@ public class JobParameters implements Serializable {
* @return the JobParameters instance that describes a Job.
*/
public JobParameters create() {
return new JobParameters(requirements, isPersistent, groupId, encryptionKeys, retryCount, wakeLock, wakeLockTimeout);
return new JobParameters(requirements, isPersistent, groupId, encryptionKeys, retryCount, System.currentTimeMillis() + retryDuration, wakeLock, wakeLockTimeout);
}
}
}

View File

@ -16,15 +16,17 @@
*/
package org.thoughtcrime.securesms.jobmanager;
import java.util.HashSet;
import android.support.annotation.NonNull;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import java.util.Set;
import java.util.Map;
class JobQueue {
private final Set<String> activeGroupIds = new HashSet<>();
private final Map<String, Job> activeGroupIds = new HashMap<>();
private final LinkedList<Job> jobQueue = new LinkedList<>();
synchronized void onRequirementStatusChanged() {
@ -33,14 +35,29 @@ class JobQueue {
synchronized void add(Job job) {
jobQueue.add(job);
processJobAddition(job);
notifyAll();
}
synchronized void addAll(List<Job> jobs) {
jobQueue.addAll(jobs);
for (Job job : jobs) {
processJobAddition(job);
}
notifyAll();
}
private void processJobAddition(@NonNull Job job) {
if (isJobActive(job) && isGroupIdAvailable(job)) {
setGroupIdUnavailable(job);
} else if (!isGroupIdAvailable(job)) {
Job blockingJob = activeGroupIds.get(job.getGroupId());
blockingJob.resetRunStats();
}
}
synchronized void push(Job job) {
jobQueue.addFirst(job);
}
@ -73,9 +90,9 @@ class JobQueue {
while (iterator.hasNext()) {
Job job = iterator.next();
if (job.isRequirementsMet() && isGroupIdAvailable(job.getGroupId())) {
if (job.isRequirementsMet() && isGroupIdAvailable(job)) {
iterator.remove();
setGroupIdUnavailable(job.getGroupId());
setGroupIdUnavailable(job);
return job;
}
}
@ -83,13 +100,19 @@ class JobQueue {
return null;
}
private boolean isGroupIdAvailable(String groupId) {
return groupId == null || !activeGroupIds.contains(groupId);
private boolean isJobActive(@NonNull Job job) {
return job.getRetryUntil() > 0 && job.getRunIteration() > 0;
}
private void setGroupIdUnavailable(String groupId) {
private boolean isGroupIdAvailable(@NonNull Job requester) {
String groupId = requester.getGroupId();
return groupId == null || !activeGroupIds.containsKey(groupId) || activeGroupIds.get(groupId).equals(requester);
}
private void setGroupIdUnavailable(@NonNull Job job) {
String groupId = job.getGroupId();
if (groupId != null) {
activeGroupIds.add(groupId);
activeGroupIds.put(groupId, job);
}
}
}

View File

@ -0,0 +1,35 @@
package org.thoughtcrime.securesms.jobmanager.requirements;
import android.app.AlarmManager;
import android.app.PendingIntent;
import android.content.BroadcastReceiver;
import android.content.Context;
import android.content.Intent;
import android.support.annotation.NonNull;
import android.util.Log;
import org.thoughtcrime.securesms.ApplicationContext;
import org.thoughtcrime.securesms.BuildConfig;
import java.util.UUID;
public class BackoffReceiver extends BroadcastReceiver {
private static final String TAG = BackoffReceiver.class.getSimpleName();
@Override
public void onReceive(Context context, Intent intent) {
Log.i(TAG, "Received an alarm to retry a job with ID: " + intent.getAction());
ApplicationContext.getInstance(context).getJobManager().onRequirementStatusChanged();
}
public static void setUniqueAlarm(@NonNull Context context, long time) {
AlarmManager alarmManager = (AlarmManager) context.getSystemService(Context.ALARM_SERVICE);
Intent intent = new Intent(context, BackoffReceiver.class);
intent.setAction(BuildConfig.APPLICATION_ID + UUID.randomUUID().toString());
alarmManager.set(AlarmManager.RTC_WAKEUP, time, PendingIntent.getBroadcast(context, 0, intent, 0));
Log.i(TAG, "Set an alarm to retry a job in " + (time - System.currentTimeMillis()) + " ms with ID: " + intent.getAction());
}
}

View File

@ -0,0 +1,50 @@
package org.thoughtcrime.securesms.jobmanager.requirements;
import android.content.Context;
import android.support.annotation.NonNull;
import org.thoughtcrime.securesms.jobmanager.Job;
import org.thoughtcrime.securesms.jobmanager.dependencies.ContextDependent;
import java.util.concurrent.TimeUnit;
/**
* Uses exponential backoff to re-schedule network jobs to be retried in the future.
*/
public class NetworkBackoffRequirement implements Requirement, ContextDependent {
private static final long MAX_WAIT = TimeUnit.SECONDS.toMillis(30);
private transient Context context;
public NetworkBackoffRequirement(@NonNull Context context) {
this.context = context.getApplicationContext();
}
@Override
public boolean isPresent(@NonNull Job job) {
return new NetworkRequirement(context).isPresent() && System.currentTimeMillis() >= calculateNextRunTime(job);
}
@Override
public void onRetry(@NonNull Job job) {
if (!(new NetworkRequirement(context).isPresent())) {
job.resetRunStats();
return;
}
BackoffReceiver.setUniqueAlarm(context, NetworkBackoffRequirement.calculateNextRunTime(job));
}
@Override
public void setContext(Context context) {
this.context = context.getApplicationContext();
}
private static long calculateNextRunTime(@NonNull Job job) {
long targetTime = job.getLastRunTime() + (long) (Math.pow(2, job.getRunIteration() - 1) * 1000);
long furthestTime = System.currentTimeMillis() + MAX_WAIT;
return Math.min(targetTime, Math.min(furthestTime, job.getRetryUntil()));
}
}

View File

@ -25,7 +25,7 @@ import org.thoughtcrime.securesms.jobmanager.dependencies.ContextDependent;
/**
* A requirement that is satisfied when a network connection is present.
*/
public class NetworkRequirement implements Requirement, ContextDependent {
public class NetworkRequirement extends SimpleRequirement implements ContextDependent {
private transient Context context;

View File

@ -16,6 +16,10 @@
*/
package org.thoughtcrime.securesms.jobmanager.requirements;
import android.support.annotation.NonNull;
import org.thoughtcrime.securesms.jobmanager.Job;
import java.io.Serializable;
/**
@ -25,5 +29,7 @@ public interface Requirement extends Serializable {
/**
* @return true if the requirement is satisfied, false otherwise.
*/
public boolean isPresent();
boolean isPresent(@NonNull Job job);
void onRetry(@NonNull Job job);
}

View File

@ -0,0 +1,19 @@
package org.thoughtcrime.securesms.jobmanager.requirements;
import android.support.annotation.NonNull;
import org.thoughtcrime.securesms.jobmanager.Job;
public abstract class SimpleRequirement implements Requirement {
@Override
public boolean isPresent(@NonNull Job job) {
return isPresent();
}
@Override
public void onRetry(@NonNull Job job) {
}
public abstract boolean isPresent();
}

View File

@ -12,6 +12,7 @@ import java.io.IOException;
public class PushContentReceiveJob extends PushReceivedJob {
private static final long serialVersionUID = 5685475456901715638L;
private static final String TAG = PushContentReceiveJob.class.getSimpleName();
private final String data;

View File

@ -16,7 +16,7 @@ import org.thoughtcrime.securesms.database.Address;
import org.thoughtcrime.securesms.database.DatabaseFactory;
import org.thoughtcrime.securesms.events.PartProgressEvent;
import org.thoughtcrime.securesms.jobmanager.JobParameters;
import org.thoughtcrime.securesms.jobmanager.requirements.NetworkRequirement;
import org.thoughtcrime.securesms.jobmanager.requirements.NetworkBackoffRequirement;
import org.thoughtcrime.securesms.jobs.requirements.MasterSecretRequirement;
import org.thoughtcrime.securesms.mms.DecryptableStreamUriLoader;
import org.thoughtcrime.securesms.mms.OutgoingMediaMessage;
@ -38,9 +38,11 @@ import java.io.IOException;
import java.io.InputStream;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
public abstract class PushSendJob extends SendJob {
private static final long serialVersionUID = 5906098204770900739L;
private static final String TAG = PushSendJob.class.getSimpleName();
protected PushSendJob(Context context, JobParameters parameters) {
@ -52,8 +54,8 @@ public abstract class PushSendJob extends SendJob {
builder.withPersistence();
builder.withGroupId(destination.serialize());
builder.withRequirement(new MasterSecretRequirement(context));
builder.withRequirement(new NetworkRequirement(context));
builder.withRetryCount(5);
builder.withRequirement(new NetworkBackoffRequirement(context));
builder.withRetryDuration(TimeUnit.DAYS.toMillis(1));
return builder.create();
}

View File

@ -29,6 +29,7 @@ import java.util.ArrayList;
public class SmsSendJob extends SendJob {
private static final long serialVersionUID = -5118520036244759718L;
private static final String TAG = SmsSendJob.class.getSimpleName();
private final long messageId;

View File

@ -18,6 +18,7 @@ import org.thoughtcrime.securesms.service.SmsDeliveryListener;
public class SmsSentJob extends MasterSecretJob {
private static final long serialVersionUID = -2624694558755317560L;
private static final String TAG = SmsSentJob.class.getSimpleName();
private final long messageId;

View File

@ -4,9 +4,10 @@ import android.content.Context;
import org.thoughtcrime.securesms.jobmanager.dependencies.ContextDependent;
import org.thoughtcrime.securesms.jobmanager.requirements.Requirement;
import org.thoughtcrime.securesms.jobmanager.requirements.SimpleRequirement;
import org.thoughtcrime.securesms.service.KeyCachingService;
public class MasterSecretRequirement implements Requirement, ContextDependent {
public class MasterSecretRequirement extends SimpleRequirement implements ContextDependent {
private transient Context context;

View File

@ -5,8 +5,9 @@ import android.content.Context;
import org.thoughtcrime.securesms.jobmanager.dependencies.ContextDependent;
import org.thoughtcrime.securesms.jobmanager.requirements.NetworkRequirement;
import org.thoughtcrime.securesms.jobmanager.requirements.Requirement;
import org.thoughtcrime.securesms.jobmanager.requirements.SimpleRequirement;
public class NetworkOrServiceRequirement implements Requirement, ContextDependent {
public class NetworkOrServiceRequirement extends SimpleRequirement implements ContextDependent {
private transient Context context;

View File

@ -4,9 +4,10 @@ import android.content.Context;
import org.thoughtcrime.securesms.jobmanager.dependencies.ContextDependent;
import org.thoughtcrime.securesms.jobmanager.requirements.Requirement;
import org.thoughtcrime.securesms.jobmanager.requirements.SimpleRequirement;
import org.thoughtcrime.securesms.sms.TelephonyServiceState;
public class ServiceRequirement implements Requirement, ContextDependent {
public class ServiceRequirement extends SimpleRequirement implements ContextDependent {
private static final String TAG = ServiceRequirement.class.getSimpleName();

View File

@ -6,9 +6,10 @@ import android.support.annotation.NonNull;
import org.thoughtcrime.securesms.jobmanager.dependencies.ContextDependent;
import org.thoughtcrime.securesms.jobmanager.requirements.Requirement;
import org.thoughtcrime.securesms.jobmanager.requirements.SimpleRequirement;
import org.thoughtcrime.securesms.util.TextSecurePreferences;
public class SqlCipherMigrationRequirement implements Requirement, ContextDependent {
public class SqlCipherMigrationRequirement extends SimpleRequirement implements ContextDependent {
@SuppressWarnings("unused")
private static final String TAG = SqlCipherMigrationRequirement.class.getSimpleName();