Implement send support for resumable uploads behind a flag.

This commit is contained in:
Alex Hart
2020-04-16 17:06:18 -03:00
committed by Greyson Parrelli
parent 7c442865c5
commit 2afb939ee6
24 changed files with 913 additions and 97 deletions

View File

@@ -7,6 +7,7 @@ import android.os.Build;
import androidx.annotation.NonNull;
import androidx.annotation.Nullable;
import androidx.annotation.WorkerThread;
import org.greenrobot.eventbus.EventBus;
import org.thoughtcrime.securesms.R;
@@ -26,12 +27,15 @@ import org.thoughtcrime.securesms.logging.Log;
import org.thoughtcrime.securesms.mms.PartAuthority;
import org.thoughtcrime.securesms.service.GenericForegroundService;
import org.thoughtcrime.securesms.service.NotificationController;
import org.thoughtcrime.securesms.util.FeatureFlags;
import org.thoughtcrime.securesms.util.MediaMetadataRetrieverUtil;
import org.thoughtcrime.securesms.util.MediaUtil;
import org.whispersystems.libsignal.util.guava.Optional;
import org.whispersystems.signalservice.api.SignalServiceMessageSender;
import org.whispersystems.signalservice.api.messages.SignalServiceAttachment;
import org.whispersystems.signalservice.api.messages.SignalServiceAttachmentPointer;
import org.whispersystems.signalservice.api.push.exceptions.ResumeLocationInvalidException;
import org.whispersystems.signalservice.internal.push.http.ResumableUploadSpec;
import java.io.IOException;
import java.io.InputStream;
@@ -51,8 +55,8 @@ public final class AttachmentUploadJob extends BaseJob {
private static final long UPLOAD_REUSE_THRESHOLD = TimeUnit.DAYS.toMillis(3);
private static final String KEY_ROW_ID = "row_id";
private static final String KEY_UNIQUE_ID = "unique_id";
private static final String KEY_ROW_ID = "row_id";
private static final String KEY_UNIQUE_ID = "unique_id";
/**
* Foreground notification shows while uploading attachments above this.
@@ -89,6 +93,18 @@ public final class AttachmentUploadJob extends BaseJob {
@Override
public void onRun() throws Exception {
final ResumableUploadSpec resumableUploadSpec;
if (FeatureFlags.attachmentsV3()) {
Data inputData = requireInputData();
if (!inputData.hasString(ResumableUploadSpecJob.KEY_RESUME_SPEC)) {
throw new ResumeLocationInvalidException("V3 Attachment upload requires a ResumableUploadSpec");
}
resumableUploadSpec = ResumableUploadSpec.deserialize(inputData.getString(ResumableUploadSpecJob.KEY_RESUME_SPEC));
} else {
resumableUploadSpec = null;
}
SignalServiceMessageSender messageSender = ApplicationDependencies.getSignalServiceMessageSender();
AttachmentDatabase database = DatabaseFactory.getAttachmentDatabase(context);
DatabaseAttachment databaseAttachment = database.getAttachment(attachmentId);
@@ -108,7 +124,7 @@ public final class AttachmentUploadJob extends BaseJob {
Log.i(TAG, "Uploading attachment for message " + databaseAttachment.getMmsId() + " with ID " + databaseAttachment.getAttachmentId());
try (NotificationController notification = getNotificationForAttachment(databaseAttachment)) {
SignalServiceAttachment localAttachment = getAttachmentFor(databaseAttachment, notification);
SignalServiceAttachment localAttachment = getAttachmentFor(databaseAttachment, notification, resumableUploadSpec);
SignalServiceAttachmentPointer remoteAttachment = messageSender.uploadAttachment(localAttachment.asStream());
Attachment attachment = PointerAttachment.forPointer(Optional.of(remoteAttachment), null, databaseAttachment.getFastPreflightId()).get();
@@ -133,10 +149,12 @@ public final class AttachmentUploadJob extends BaseJob {
@Override
protected boolean onShouldRetry(@NonNull Exception exception) {
if (exception instanceof ResumeLocationInvalidException) return false;
return exception instanceof IOException;
}
private @NonNull SignalServiceAttachment getAttachmentFor(Attachment attachment, @Nullable NotificationController notification) throws InvalidAttachmentException {
private @NonNull SignalServiceAttachment getAttachmentFor(Attachment attachment, @Nullable NotificationController notification, @Nullable ResumableUploadSpec resumableUploadSpec) throws InvalidAttachmentException {
try {
if (attachment.getDataUri() == null || attachment.getSize() == 0) throw new IOException("Assertion failed, outgoing attachment has no data!");
InputStream is = PartAuthority.getAttachmentStream(context, attachment.getDataUri());
@@ -151,6 +169,7 @@ public final class AttachmentUploadJob extends BaseJob {
.withUploadTimestamp(System.currentTimeMillis())
.withCaption(attachment.getCaption())
.withCancelationSignal(this::isCanceled)
.withResumableUploadSpec(resumableUploadSpec)
.withListener((total, progress) -> {
EventBus.getDefault().postSticky(new PartProgressEvent(attachment, PartProgressEvent.Type.NETWORK, total, progress));
if (notification != null) {

View File

@@ -89,6 +89,7 @@ public final class JobManagerFactories {
put(RemoteConfigRefreshJob.KEY, new RemoteConfigRefreshJob.Factory());
put(RemoteDeleteSendJob.KEY, new RemoteDeleteSendJob.Factory());
put(RequestGroupInfoJob.KEY, new RequestGroupInfoJob.Factory());
put(ResumableUploadSpecJob.KEY, new ResumableUploadSpecJob.Factory());
put(StorageAccountRestoreJob.KEY, new StorageAccountRestoreJob.Factory());
put(RetrieveProfileAvatarJob.KEY, new RetrieveProfileAvatarJob.Factory());
put(RetrieveProfileJob.KEY, new RetrieveProfileJob.Factory());

View File

@@ -107,12 +107,11 @@ public class PushGroupSendJob extends PushSendJob {
throw new MmsException("Inactive group!");
}
MmsDatabase database = DatabaseFactory.getMmsDatabase(context);
OutgoingMediaMessage message = database.getOutgoingMessage(messageId);
JobManager.Chain compressAndUploadAttachment = createCompressingAndUploadAttachmentsChain(jobManager, message);
MmsDatabase database = DatabaseFactory.getMmsDatabase(context);
OutgoingMediaMessage message = database.getOutgoingMessage(messageId);
Set<String> attachmentUploadIds = enqueueCompressingAndUploadAttachmentsChains(jobManager, message);
compressAndUploadAttachment.then(new PushGroupSendJob(messageId, destination, filterAddress))
.enqueue();
jobManager.add(new PushGroupSendJob(messageId, destination, filterAddress), attachmentUploadIds);
} catch (NoSuchMessageException | MmsException e) {
Log.w(TAG, "Failed to enqueue message.", e);

View File

@@ -45,6 +45,7 @@ import org.whispersystems.signalservice.api.push.exceptions.UnregisteredUserExce
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.List;
import java.util.Set;
public class PushMediaSendJob extends PushSendJob {
@@ -72,12 +73,11 @@ public class PushMediaSendJob extends PushSendJob {
throw new AssertionError();
}
MmsDatabase database = DatabaseFactory.getMmsDatabase(context);
OutgoingMediaMessage message = database.getOutgoingMessage(messageId);
JobManager.Chain compressAndUploadAttachment = createCompressingAndUploadAttachmentsChain(jobManager, message);
MmsDatabase database = DatabaseFactory.getMmsDatabase(context);
OutgoingMediaMessage message = database.getOutgoingMessage(messageId);
Set<String> attachmentUploadIds = enqueueCompressingAndUploadAttachmentsChains(jobManager, message);
compressAndUploadAttachment.then(new PushMediaSendJob(messageId, recipient))
.enqueue();
jobManager.add(new PushMediaSendJob(messageId, recipient), attachmentUploadIds);
} catch (NoSuchMessageException | MmsException e) {
Log.w(TAG, "Failed to enqueue message.", e);

View File

@@ -57,8 +57,10 @@ import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
public abstract class PushSendJob extends SendJob {
@@ -146,7 +148,7 @@ public abstract class PushSendJob extends SendJob {
return null;
}
protected static JobManager.Chain createCompressingAndUploadAttachmentsChain(@NonNull JobManager jobManager, OutgoingMediaMessage message) {
protected static Set<String> enqueueCompressingAndUploadAttachmentsChains(@NonNull JobManager jobManager, OutgoingMediaMessage message) {
List<Attachment> attachments = new LinkedList<>();
attachments.addAll(message.getAttachments());
@@ -162,12 +164,17 @@ public abstract class PushSendJob extends SendJob {
.map(Contact.Avatar::getAttachment).withoutNulls()
.toList());
List<AttachmentCompressionJob> compressionJobs = Stream.of(attachments).map(a -> AttachmentCompressionJob.fromAttachment((DatabaseAttachment) a, false, -1)).toList();
return new HashSet<>(Stream.of(attachments).map(a -> {
AttachmentUploadJob attachmentUploadJob = new AttachmentUploadJob(((DatabaseAttachment) a).getAttachmentId());
List<AttachmentUploadJob> attachmentJobs = Stream.of(attachments).map(a -> new AttachmentUploadJob(((DatabaseAttachment) a).getAttachmentId())).toList();
jobManager.startChain(AttachmentCompressionJob.fromAttachment((DatabaseAttachment) a, false, -1))
.then(new ResumableUploadSpecJob())
.then(attachmentUploadJob)
.enqueue();
return jobManager.startChain(compressionJobs)
.then(attachmentJobs);
return attachmentUploadJob.getId();
})
.toList());
}
protected @NonNull List<SignalServiceAttachment> getAttachmentPointersFor(List<Attachment> attachments) {

View File

@@ -0,0 +1,77 @@
package org.thoughtcrime.securesms.jobs;
import androidx.annotation.NonNull;
import org.thoughtcrime.securesms.dependencies.ApplicationDependencies;
import org.thoughtcrime.securesms.jobmanager.Data;
import org.thoughtcrime.securesms.jobmanager.Job;
import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint;
import org.thoughtcrime.securesms.logging.Log;
import org.thoughtcrime.securesms.util.FeatureFlags;
import org.whispersystems.signalservice.internal.push.http.ResumableUploadSpec;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
public class ResumableUploadSpecJob extends BaseJob {
private static final String TAG = Log.tag(ResumableUploadSpecJob.class);
static final String KEY_RESUME_SPEC = "resume_spec";
public static final String KEY = "ResumableUploadSpecJob";
public ResumableUploadSpecJob() {
this(new Job.Parameters.Builder()
.addConstraint(NetworkConstraint.KEY)
.setLifespan(TimeUnit.DAYS.toMillis(1))
.setMaxAttempts(Parameters.UNLIMITED)
.build());
}
private ResumableUploadSpecJob(@NonNull Parameters parameters) {
super(parameters);
}
@Override
protected void onRun() throws Exception {
if (!FeatureFlags.attachmentsV3()) {
Log.i(TAG, "Attachments V3 is not enabled so there is nothing to do!");
}
ResumableUploadSpec resumableUploadSpec = ApplicationDependencies.getSignalServiceMessageSender()
.getResumableUploadSpec();
setOutputData(new Data.Builder()
.putString(KEY_RESUME_SPEC, resumableUploadSpec.serialize())
.build());
}
@Override
protected boolean onShouldRetry(@NonNull Exception e) {
return e instanceof IOException;
}
@Override
public @NonNull Data serialize() {
return Data.EMPTY;
}
@Override
public @NonNull String getFactoryKey() {
return KEY;
}
@Override
public void onFailure() {
}
public static class Factory implements Job.Factory<ResumableUploadSpecJob> {
@Override
public @NonNull ResumableUploadSpecJob create(@NonNull Parameters parameters, @NonNull Data data) {
return new ResumableUploadSpecJob(parameters);
}
}
}

View File

@@ -59,6 +59,7 @@ import org.thoughtcrime.securesms.jobs.PushMediaSendJob;
import org.thoughtcrime.securesms.jobs.PushTextSendJob;
import org.thoughtcrime.securesms.jobs.ReactionSendJob;
import org.thoughtcrime.securesms.jobs.RemoteDeleteSendJob;
import org.thoughtcrime.securesms.jobs.ResumableUploadSpecJob;
import org.thoughtcrime.securesms.jobs.SmsSendJob;
import org.thoughtcrime.securesms.logging.Log;
import org.thoughtcrime.securesms.mms.MmsException;
@@ -275,15 +276,17 @@ public class MessageSender {
AttachmentDatabase attachmentDatabase = DatabaseFactory.getAttachmentDatabase(context);
DatabaseAttachment databaseAttachment = attachmentDatabase.insertAttachmentForPreUpload(attachment);
Job compressionJob = AttachmentCompressionJob.fromAttachment(databaseAttachment, false, -1);
Job uploadJob = new AttachmentUploadJob(databaseAttachment.getAttachmentId());
Job compressionJob = AttachmentCompressionJob.fromAttachment(databaseAttachment, false, -1);
Job resumableUploadSpecJob = new ResumableUploadSpecJob();
Job uploadJob = new AttachmentUploadJob(databaseAttachment.getAttachmentId());
ApplicationDependencies.getJobManager()
.startChain(compressionJob)
.then(resumableUploadSpecJob)
.then(uploadJob)
.enqueue();
return new PreUploadResult(databaseAttachment.getAttachmentId(), Arrays.asList(compressionJob.getId(), uploadJob.getId()));
return new PreUploadResult(databaseAttachment.getAttachmentId(), Arrays.asList(compressionJob.getId(), resumableUploadSpecJob.getId(), uploadJob.getId()));
} catch (MmsException e) {
Log.w(TAG, "preUploadPushAttachment() - Failed to upload!", e);
return null;