Cancel typing jobs when you send a group message.

This commit is contained in:
Greyson Parrelli
2020-06-12 07:06:20 -07:00
committed by GitHub
parent 8891b6c930
commit 3fad007ae0
8 changed files with 118 additions and 19 deletions

View File

@@ -46,6 +46,7 @@ import org.whispersystems.signalservice.api.util.UptimeSleepTimer;
import org.whispersystems.signalservice.api.websocket.ConnectivityListener;
import java.util.UUID;
import java.util.concurrent.Executors;
/**
* Implementation of {@link ApplicationDependencies.Provider} that provides real app dependencies.
@@ -91,7 +92,7 @@ public class ApplicationDependencyProvider implements ApplicationDependencies.Pr
Optional.fromNullable(IncomingMessageObserver.getUnidentifiedPipe()),
Optional.of(new SecurityEventListener(context)),
provideClientZkOperations().getProfileOperations(),
SignalExecutors.UNBOUNDED);
SignalExecutors.newCachedBoundedExecutor("signal-messages", 1, 16));
}
@Override

View File

@@ -23,6 +23,7 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
/**
@@ -150,6 +151,14 @@ class JobController {
}
}
@WorkerThread
synchronized void cancelAllInQueue(@NonNull String queue) {
Stream.of(runningJobs.values())
.filter(j -> Objects.equals(j.getParameters().getQueue(), queue))
.map(Job::getId)
.forEach(this::cancelJob);
}
@WorkerThread
synchronized void onRetry(@NonNull Job job) {
int nextRunAttempt = job.getRunAttempt() + 1;

View File

@@ -198,6 +198,13 @@ public class JobManager implements ConstraintObserver.Notifier {
executor.execute(() -> jobController.cancelJob(id));
}
/**
* Cancels all jobs in the specified queue. See {@link #cancel(String)} for details.
*/
public void cancelAllInQueue(@NonNull String queue) {
executor.execute(() -> jobController.cancelAllInQueue(queue));
}
/**
* Runs the specified job synchronously. Beware: All normal dependencies are respected, meaning
* you must take great care where you call this. It could take a very long time to complete!

View File

@@ -151,6 +151,9 @@ public class PushGroupSendJob extends PushSendJob {
List<NetworkFailure> existingNetworkFailures = message.getNetworkFailures();
List<IdentityKeyMismatch> existingIdentityMismatches = message.getIdentityKeyMismatches();
long threadId = DatabaseFactory.getThreadDatabase(context).getThreadIdFor(message.getRecipient());
ApplicationDependencies.getJobManager().cancelAllInQueue(TypingSendJob.getQueue(threadId));
if (database.isSent(messageId)) {
log(TAG, "Message " + messageId + " was already sent. Ignoring.");
return;

View File

@@ -15,6 +15,7 @@ import org.thoughtcrime.securesms.recipients.Recipient;
import org.thoughtcrime.securesms.recipients.RecipientUtil;
import org.thoughtcrime.securesms.util.TextSecurePreferences;
import org.whispersystems.libsignal.util.guava.Optional;
import org.whispersystems.signalservice.api.CancelationException;
import org.whispersystems.signalservice.api.SignalServiceMessageSender;
import org.whispersystems.signalservice.api.crypto.UnidentifiedAccessPair;
import org.whispersystems.signalservice.api.messages.SignalServiceTypingMessage;
@@ -39,7 +40,7 @@ public class TypingSendJob extends BaseJob {
public TypingSendJob(long threadId, boolean typing) {
this(new Job.Parameters.Builder()
.setQueue("TYPING_" + threadId)
.setQueue(getQueue(threadId))
.setMaxAttempts(1)
.setLifespan(TimeUnit.SECONDS.toMillis(5))
.build(),
@@ -47,6 +48,10 @@ public class TypingSendJob extends BaseJob {
typing);
}
public static String getQueue(long threadId) {
return "TYPING_" + threadId;
}
private TypingSendJob(@NonNull Job.Parameters parameters, long threadId, boolean typing) {
super(parameters);
@@ -101,7 +106,16 @@ public class TypingSendJob extends BaseJob {
List<Optional<UnidentifiedAccessPair>> unidentifiedAccess = Stream.of(recipients).map(r -> UnidentifiedAccessUtil.getAccessFor(context, r)).toList();
SignalServiceTypingMessage typingMessage = new SignalServiceTypingMessage(typing ? Action.STARTED : Action.STOPPED, System.currentTimeMillis(), groupId);
messageSender.sendTyping(addresses, unidentifiedAccess, typingMessage);
if (isCanceled()) {
Log.w(TAG, "Canceled before send!");
return;
}
try {
messageSender.sendTyping(addresses, unidentifiedAccess, typingMessage, this::isCanceled);
} catch (CancelationException e) {
Log.w(TAG, "Canceled during send!");
}
}
@Override

View File

@@ -6,9 +6,13 @@ import com.google.android.gms.common.util.concurrent.NumberedThreadFactory;
import org.thoughtcrime.securesms.util.LinkedBlockingLifoQueue;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -26,6 +30,44 @@ public class SignalExecutors {
return executor;
}
/**
* ThreadPoolExecutor will only create a new thread if the provided queue returns false from
* offer(). That means if you give it an unbounded queue, it'll only ever create 1 thread, no
* matter how long the queue gets.
*
* But if you bound the queue and submit more runnables than there are threads, your task is
* rejected and throws an exception.
*
* So we make a queue that will always return false if it's non-empty to ensure new threads get
* created. Then, if a task gets rejected, we simply add it to the queue.
*/
public static ExecutorService newCachedBoundedExecutor(final String name, int minThreads, int maxThreads) {
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(minThreads,
maxThreads,
30,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>() {
@Override
public boolean offer(Runnable runnable) {
if (isEmpty()) {
return super.offer(runnable);
} else {
return false;
}
}
}, new NumberedThreadFactory(name));
threadPool.setRejectedExecutionHandler((runnable, executor) -> {
try {
executor.getQueue().put(runnable);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
return threadPool;
}
/**
* Returns an executor that prioritizes newer work. This is the opposite of a traditional executor,
* which processor work in FIFO order.