From 2afb939ee6fc7d33526877aab16ee7eb4fd93278 Mon Sep 17 00:00:00 2001 From: Alex Hart Date: Thu, 16 Apr 2020 17:06:18 -0300 Subject: [PATCH] Implement send support for resumable uploads behind a flag. --- .../securesms/jobs/AttachmentUploadJob.java | 27 ++- .../securesms/jobs/JobManagerFactories.java | 1 + .../securesms/jobs/PushGroupSendJob.java | 9 +- .../securesms/jobs/PushMediaSendJob.java | 10 +- .../securesms/jobs/PushSendJob.java | 17 +- .../jobs/ResumableUploadSpecJob.java | 77 ++++++++ .../securesms/sms/MessageSender.java | 9 +- .../api/SignalServiceMessageSender.java | 21 ++- .../crypto/AttachmentCipherOutputStream.java | 12 +- .../api/crypto/DigestingOutputStream.java | 2 +- .../api/crypto/SkippingOutputStream.java | 53 ++++++ .../api/messages/SignalServiceAttachment.java | 34 ++-- .../SignalServiceAttachmentStream.java | 60 +++--- .../ResumeLocationInvalidException.java | 14 ++ .../internal/push/NowhereBufferedSink.java | 176 ++++++++++++++++++ .../internal/push/PushAttachmentData.java | 37 ++-- .../internal/push/PushServiceSocket.java | 110 ++++++++++- .../AttachmentCipherOutputStreamFactory.java | 6 +- .../push/http/DigestingRequestBody.java | 14 +- .../push/http/ResumableUploadSpec.java | 93 +++++++++ .../src/main/proto/ResumableUploads.proto | 17 ++ .../api/crypto/AttachmentCipherTest.java | 2 +- .../api/crypto/SkippingOutputStreamTest.java | 136 ++++++++++++++ .../push/http/DigestingRequestBodyTest.java | 73 ++++++++ 24 files changed, 913 insertions(+), 97 deletions(-) create mode 100644 app/src/main/java/org/thoughtcrime/securesms/jobs/ResumableUploadSpecJob.java create mode 100644 libsignal/service/src/main/java/org/whispersystems/signalservice/api/crypto/SkippingOutputStream.java create mode 100644 libsignal/service/src/main/java/org/whispersystems/signalservice/api/push/exceptions/ResumeLocationInvalidException.java create mode 100644 libsignal/service/src/main/java/org/whispersystems/signalservice/internal/push/NowhereBufferedSink.java create mode 100644 libsignal/service/src/main/java/org/whispersystems/signalservice/internal/push/http/ResumableUploadSpec.java create mode 100644 libsignal/service/src/main/proto/ResumableUploads.proto create mode 100644 libsignal/service/src/test/java/org/whispersystems/signalservice/api/crypto/SkippingOutputStreamTest.java create mode 100644 libsignal/service/src/test/java/org/whispersystems/signalservice/internal/push/http/DigestingRequestBodyTest.java diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/AttachmentUploadJob.java b/app/src/main/java/org/thoughtcrime/securesms/jobs/AttachmentUploadJob.java index 873d486ec5..1e08a9117b 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/AttachmentUploadJob.java +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/AttachmentUploadJob.java @@ -7,6 +7,7 @@ import android.os.Build; import androidx.annotation.NonNull; import androidx.annotation.Nullable; +import androidx.annotation.WorkerThread; import org.greenrobot.eventbus.EventBus; import org.thoughtcrime.securesms.R; @@ -26,12 +27,15 @@ import org.thoughtcrime.securesms.logging.Log; import org.thoughtcrime.securesms.mms.PartAuthority; import org.thoughtcrime.securesms.service.GenericForegroundService; import org.thoughtcrime.securesms.service.NotificationController; +import org.thoughtcrime.securesms.util.FeatureFlags; import org.thoughtcrime.securesms.util.MediaMetadataRetrieverUtil; import org.thoughtcrime.securesms.util.MediaUtil; import org.whispersystems.libsignal.util.guava.Optional; import org.whispersystems.signalservice.api.SignalServiceMessageSender; import org.whispersystems.signalservice.api.messages.SignalServiceAttachment; import org.whispersystems.signalservice.api.messages.SignalServiceAttachmentPointer; +import org.whispersystems.signalservice.api.push.exceptions.ResumeLocationInvalidException; +import org.whispersystems.signalservice.internal.push.http.ResumableUploadSpec; import java.io.IOException; import java.io.InputStream; @@ -51,8 +55,8 @@ public final class AttachmentUploadJob extends BaseJob { private static final long UPLOAD_REUSE_THRESHOLD = TimeUnit.DAYS.toMillis(3); - private static final String KEY_ROW_ID = "row_id"; - private static final String KEY_UNIQUE_ID = "unique_id"; + private static final String KEY_ROW_ID = "row_id"; + private static final String KEY_UNIQUE_ID = "unique_id"; /** * Foreground notification shows while uploading attachments above this. @@ -89,6 +93,18 @@ public final class AttachmentUploadJob extends BaseJob { @Override public void onRun() throws Exception { + final ResumableUploadSpec resumableUploadSpec; + if (FeatureFlags.attachmentsV3()) { + Data inputData = requireInputData(); + if (!inputData.hasString(ResumableUploadSpecJob.KEY_RESUME_SPEC)) { + throw new ResumeLocationInvalidException("V3 Attachment upload requires a ResumableUploadSpec"); + } + + resumableUploadSpec = ResumableUploadSpec.deserialize(inputData.getString(ResumableUploadSpecJob.KEY_RESUME_SPEC)); + } else { + resumableUploadSpec = null; + } + SignalServiceMessageSender messageSender = ApplicationDependencies.getSignalServiceMessageSender(); AttachmentDatabase database = DatabaseFactory.getAttachmentDatabase(context); DatabaseAttachment databaseAttachment = database.getAttachment(attachmentId); @@ -108,7 +124,7 @@ public final class AttachmentUploadJob extends BaseJob { Log.i(TAG, "Uploading attachment for message " + databaseAttachment.getMmsId() + " with ID " + databaseAttachment.getAttachmentId()); try (NotificationController notification = getNotificationForAttachment(databaseAttachment)) { - SignalServiceAttachment localAttachment = getAttachmentFor(databaseAttachment, notification); + SignalServiceAttachment localAttachment = getAttachmentFor(databaseAttachment, notification, resumableUploadSpec); SignalServiceAttachmentPointer remoteAttachment = messageSender.uploadAttachment(localAttachment.asStream()); Attachment attachment = PointerAttachment.forPointer(Optional.of(remoteAttachment), null, databaseAttachment.getFastPreflightId()).get(); @@ -133,10 +149,12 @@ public final class AttachmentUploadJob extends BaseJob { @Override protected boolean onShouldRetry(@NonNull Exception exception) { + if (exception instanceof ResumeLocationInvalidException) return false; + return exception instanceof IOException; } - private @NonNull SignalServiceAttachment getAttachmentFor(Attachment attachment, @Nullable NotificationController notification) throws InvalidAttachmentException { + private @NonNull SignalServiceAttachment getAttachmentFor(Attachment attachment, @Nullable NotificationController notification, @Nullable ResumableUploadSpec resumableUploadSpec) throws InvalidAttachmentException { try { if (attachment.getDataUri() == null || attachment.getSize() == 0) throw new IOException("Assertion failed, outgoing attachment has no data!"); InputStream is = PartAuthority.getAttachmentStream(context, attachment.getDataUri()); @@ -151,6 +169,7 @@ public final class AttachmentUploadJob extends BaseJob { .withUploadTimestamp(System.currentTimeMillis()) .withCaption(attachment.getCaption()) .withCancelationSignal(this::isCanceled) + .withResumableUploadSpec(resumableUploadSpec) .withListener((total, progress) -> { EventBus.getDefault().postSticky(new PartProgressEvent(attachment, PartProgressEvent.Type.NETWORK, total, progress)); if (notification != null) { diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/JobManagerFactories.java b/app/src/main/java/org/thoughtcrime/securesms/jobs/JobManagerFactories.java index dbfca4cbff..bcafd58bd2 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/JobManagerFactories.java +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/JobManagerFactories.java @@ -89,6 +89,7 @@ public final class JobManagerFactories { put(RemoteConfigRefreshJob.KEY, new RemoteConfigRefreshJob.Factory()); put(RemoteDeleteSendJob.KEY, new RemoteDeleteSendJob.Factory()); put(RequestGroupInfoJob.KEY, new RequestGroupInfoJob.Factory()); + put(ResumableUploadSpecJob.KEY, new ResumableUploadSpecJob.Factory()); put(StorageAccountRestoreJob.KEY, new StorageAccountRestoreJob.Factory()); put(RetrieveProfileAvatarJob.KEY, new RetrieveProfileAvatarJob.Factory()); put(RetrieveProfileJob.KEY, new RetrieveProfileJob.Factory()); diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/PushGroupSendJob.java b/app/src/main/java/org/thoughtcrime/securesms/jobs/PushGroupSendJob.java index e9d82523db..0aa51f25d0 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/PushGroupSendJob.java +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/PushGroupSendJob.java @@ -107,12 +107,11 @@ public class PushGroupSendJob extends PushSendJob { throw new MmsException("Inactive group!"); } - MmsDatabase database = DatabaseFactory.getMmsDatabase(context); - OutgoingMediaMessage message = database.getOutgoingMessage(messageId); - JobManager.Chain compressAndUploadAttachment = createCompressingAndUploadAttachmentsChain(jobManager, message); + MmsDatabase database = DatabaseFactory.getMmsDatabase(context); + OutgoingMediaMessage message = database.getOutgoingMessage(messageId); + Set attachmentUploadIds = enqueueCompressingAndUploadAttachmentsChains(jobManager, message); - compressAndUploadAttachment.then(new PushGroupSendJob(messageId, destination, filterAddress)) - .enqueue(); + jobManager.add(new PushGroupSendJob(messageId, destination, filterAddress), attachmentUploadIds); } catch (NoSuchMessageException | MmsException e) { Log.w(TAG, "Failed to enqueue message.", e); diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/PushMediaSendJob.java b/app/src/main/java/org/thoughtcrime/securesms/jobs/PushMediaSendJob.java index 95e53ed47b..b7ed4719ee 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/PushMediaSendJob.java +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/PushMediaSendJob.java @@ -45,6 +45,7 @@ import org.whispersystems.signalservice.api.push.exceptions.UnregisteredUserExce import java.io.FileNotFoundException; import java.io.IOException; import java.util.List; +import java.util.Set; public class PushMediaSendJob extends PushSendJob { @@ -72,12 +73,11 @@ public class PushMediaSendJob extends PushSendJob { throw new AssertionError(); } - MmsDatabase database = DatabaseFactory.getMmsDatabase(context); - OutgoingMediaMessage message = database.getOutgoingMessage(messageId); - JobManager.Chain compressAndUploadAttachment = createCompressingAndUploadAttachmentsChain(jobManager, message); + MmsDatabase database = DatabaseFactory.getMmsDatabase(context); + OutgoingMediaMessage message = database.getOutgoingMessage(messageId); + Set attachmentUploadIds = enqueueCompressingAndUploadAttachmentsChains(jobManager, message); - compressAndUploadAttachment.then(new PushMediaSendJob(messageId, recipient)) - .enqueue(); + jobManager.add(new PushMediaSendJob(messageId, recipient), attachmentUploadIds); } catch (NoSuchMessageException | MmsException e) { Log.w(TAG, "Failed to enqueue message.", e); diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/PushSendJob.java b/app/src/main/java/org/thoughtcrime/securesms/jobs/PushSendJob.java index 388c2833bf..fb33783964 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/PushSendJob.java +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/PushSendJob.java @@ -57,8 +57,10 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.util.Collections; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; +import java.util.Set; import java.util.concurrent.TimeUnit; public abstract class PushSendJob extends SendJob { @@ -146,7 +148,7 @@ public abstract class PushSendJob extends SendJob { return null; } - protected static JobManager.Chain createCompressingAndUploadAttachmentsChain(@NonNull JobManager jobManager, OutgoingMediaMessage message) { + protected static Set enqueueCompressingAndUploadAttachmentsChains(@NonNull JobManager jobManager, OutgoingMediaMessage message) { List attachments = new LinkedList<>(); attachments.addAll(message.getAttachments()); @@ -162,12 +164,17 @@ public abstract class PushSendJob extends SendJob { .map(Contact.Avatar::getAttachment).withoutNulls() .toList()); - List compressionJobs = Stream.of(attachments).map(a -> AttachmentCompressionJob.fromAttachment((DatabaseAttachment) a, false, -1)).toList(); + return new HashSet<>(Stream.of(attachments).map(a -> { + AttachmentUploadJob attachmentUploadJob = new AttachmentUploadJob(((DatabaseAttachment) a).getAttachmentId()); - List attachmentJobs = Stream.of(attachments).map(a -> new AttachmentUploadJob(((DatabaseAttachment) a).getAttachmentId())).toList(); + jobManager.startChain(AttachmentCompressionJob.fromAttachment((DatabaseAttachment) a, false, -1)) + .then(new ResumableUploadSpecJob()) + .then(attachmentUploadJob) + .enqueue(); - return jobManager.startChain(compressionJobs) - .then(attachmentJobs); + return attachmentUploadJob.getId(); + }) + .toList()); } protected @NonNull List getAttachmentPointersFor(List attachments) { diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/ResumableUploadSpecJob.java b/app/src/main/java/org/thoughtcrime/securesms/jobs/ResumableUploadSpecJob.java new file mode 100644 index 0000000000..fac8fc8dd0 --- /dev/null +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/ResumableUploadSpecJob.java @@ -0,0 +1,77 @@ +package org.thoughtcrime.securesms.jobs; + +import androidx.annotation.NonNull; + +import org.thoughtcrime.securesms.dependencies.ApplicationDependencies; +import org.thoughtcrime.securesms.jobmanager.Data; +import org.thoughtcrime.securesms.jobmanager.Job; +import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint; +import org.thoughtcrime.securesms.logging.Log; +import org.thoughtcrime.securesms.util.FeatureFlags; +import org.whispersystems.signalservice.internal.push.http.ResumableUploadSpec; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +public class ResumableUploadSpecJob extends BaseJob { + + private static final String TAG = Log.tag(ResumableUploadSpecJob.class); + + static final String KEY_RESUME_SPEC = "resume_spec"; + + public static final String KEY = "ResumableUploadSpecJob"; + + public ResumableUploadSpecJob() { + this(new Job.Parameters.Builder() + .addConstraint(NetworkConstraint.KEY) + .setLifespan(TimeUnit.DAYS.toMillis(1)) + .setMaxAttempts(Parameters.UNLIMITED) + .build()); + } + + private ResumableUploadSpecJob(@NonNull Parameters parameters) { + super(parameters); + } + + @Override + protected void onRun() throws Exception { + if (!FeatureFlags.attachmentsV3()) { + Log.i(TAG, "Attachments V3 is not enabled so there is nothing to do!"); + } + + ResumableUploadSpec resumableUploadSpec = ApplicationDependencies.getSignalServiceMessageSender() + .getResumableUploadSpec(); + + setOutputData(new Data.Builder() + .putString(KEY_RESUME_SPEC, resumableUploadSpec.serialize()) + .build()); + } + + @Override + protected boolean onShouldRetry(@NonNull Exception e) { + return e instanceof IOException; + } + + @Override + public @NonNull Data serialize() { + return Data.EMPTY; + } + + @Override + public @NonNull String getFactoryKey() { + return KEY; + } + + @Override + public void onFailure() { + + } + + public static class Factory implements Job.Factory { + + @Override + public @NonNull ResumableUploadSpecJob create(@NonNull Parameters parameters, @NonNull Data data) { + return new ResumableUploadSpecJob(parameters); + } + } +} diff --git a/app/src/main/java/org/thoughtcrime/securesms/sms/MessageSender.java b/app/src/main/java/org/thoughtcrime/securesms/sms/MessageSender.java index 9adc5b1999..b3544def6a 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/sms/MessageSender.java +++ b/app/src/main/java/org/thoughtcrime/securesms/sms/MessageSender.java @@ -59,6 +59,7 @@ import org.thoughtcrime.securesms.jobs.PushMediaSendJob; import org.thoughtcrime.securesms.jobs.PushTextSendJob; import org.thoughtcrime.securesms.jobs.ReactionSendJob; import org.thoughtcrime.securesms.jobs.RemoteDeleteSendJob; +import org.thoughtcrime.securesms.jobs.ResumableUploadSpecJob; import org.thoughtcrime.securesms.jobs.SmsSendJob; import org.thoughtcrime.securesms.logging.Log; import org.thoughtcrime.securesms.mms.MmsException; @@ -275,15 +276,17 @@ public class MessageSender { AttachmentDatabase attachmentDatabase = DatabaseFactory.getAttachmentDatabase(context); DatabaseAttachment databaseAttachment = attachmentDatabase.insertAttachmentForPreUpload(attachment); - Job compressionJob = AttachmentCompressionJob.fromAttachment(databaseAttachment, false, -1); - Job uploadJob = new AttachmentUploadJob(databaseAttachment.getAttachmentId()); + Job compressionJob = AttachmentCompressionJob.fromAttachment(databaseAttachment, false, -1); + Job resumableUploadSpecJob = new ResumableUploadSpecJob(); + Job uploadJob = new AttachmentUploadJob(databaseAttachment.getAttachmentId()); ApplicationDependencies.getJobManager() .startChain(compressionJob) + .then(resumableUploadSpecJob) .then(uploadJob) .enqueue(); - return new PreUploadResult(databaseAttachment.getAttachmentId(), Arrays.asList(compressionJob.getId(), uploadJob.getId())); + return new PreUploadResult(databaseAttachment.getAttachmentId(), Arrays.asList(compressionJob.getId(), resumableUploadSpecJob.getId(), uploadJob.getId())); } catch (MmsException e) { Log.w(TAG, "preUploadPushAttachment() - Failed to upload!", e); return null; diff --git a/libsignal/service/src/main/java/org/whispersystems/signalservice/api/SignalServiceMessageSender.java b/libsignal/service/src/main/java/org/whispersystems/signalservice/api/SignalServiceMessageSender.java index 2eace93e33..e3d1ea9c0b 100644 --- a/libsignal/service/src/main/java/org/whispersystems/signalservice/api/SignalServiceMessageSender.java +++ b/libsignal/service/src/main/java/org/whispersystems/signalservice/api/SignalServiceMessageSender.java @@ -80,6 +80,7 @@ import org.whispersystems.signalservice.internal.push.StaleDevices; import org.whispersystems.signalservice.internal.push.exceptions.MismatchedDevicesException; import org.whispersystems.signalservice.internal.push.exceptions.StaleDevicesException; import org.whispersystems.signalservice.internal.push.http.AttachmentCipherOutputStreamFactory; +import org.whispersystems.signalservice.internal.push.http.ResumableUploadSpec; import org.whispersystems.signalservice.internal.util.StaticCredentialsProvider; import org.whispersystems.signalservice.internal.util.Util; import org.whispersystems.util.Base64; @@ -350,16 +351,18 @@ public class SignalServiceMessageSender { } public SignalServiceAttachmentPointer uploadAttachment(SignalServiceAttachmentStream attachment) throws IOException { - byte[] attachmentKey = Util.getSecretBytes(64); + byte[] attachmentKey = attachment.getResumableUploadSpec().transform(ResumableUploadSpec::getSecretKey).or(() -> Util.getSecretBytes(64)); + byte[] attachmentIV = attachment.getResumableUploadSpec().transform(ResumableUploadSpec::getIV).or(() -> Util.getSecretBytes(16)); long paddedLength = PaddingInputStream.getPaddedSize(attachment.getLength()); InputStream dataStream = new PaddingInputStream(attachment.getInputStream(), attachment.getLength()); long ciphertextLength = AttachmentCipherOutputStream.getCiphertextLength(paddedLength); PushAttachmentData attachmentData = new PushAttachmentData(attachment.getContentType(), dataStream, ciphertextLength, - new AttachmentCipherOutputStreamFactory(attachmentKey), + new AttachmentCipherOutputStreamFactory(attachmentKey, attachmentIV), attachment.getListener(), - attachment.getCancelationSignal()); + attachment.getCancelationSignal(), + attachment.getResumableUploadSpec().orNull()); if (attachmentsV3.get()) { return uploadAttachmentV3(attachment, attachmentKey, attachmentData); @@ -403,7 +406,7 @@ public class SignalServiceMessageSender { attachment.getUploadTimestamp()); } - private SignalServiceAttachmentPointer uploadAttachmentV3(SignalServiceAttachmentStream attachment, byte[] attachmentKey, PushAttachmentData attachmentData) throws IOException { + public ResumableUploadSpec getResumableUploadSpec() throws IOException { AttachmentV3UploadAttributes v3UploadAttributes = null; Optional localPipe = pipe.get(); @@ -421,9 +424,13 @@ public class SignalServiceMessageSender { v3UploadAttributes = socket.getAttachmentV3UploadAttributes(); } - byte[] digest = socket.uploadAttachment(attachmentData, v3UploadAttributes); - return new SignalServiceAttachmentPointer(v3UploadAttributes.getCdn(), - new SignalServiceAttachmentRemoteId(v3UploadAttributes.getKey()), + return socket.getResumableUploadSpec(v3UploadAttributes); + } + + private SignalServiceAttachmentPointer uploadAttachmentV3(SignalServiceAttachmentStream attachment, byte[] attachmentKey, PushAttachmentData attachmentData) throws IOException { + byte[] digest = socket.uploadAttachment(attachmentData); + return new SignalServiceAttachmentPointer(attachmentData.getResumableUploadSpec().getCdnNumber(), + new SignalServiceAttachmentRemoteId(attachmentData.getResumableUploadSpec().getCdnKey()), attachment.getContentType(), attachmentKey, Optional.of(Util.toIntExact(attachment.getLength())), diff --git a/libsignal/service/src/main/java/org/whispersystems/signalservice/api/crypto/AttachmentCipherOutputStream.java b/libsignal/service/src/main/java/org/whispersystems/signalservice/api/crypto/AttachmentCipherOutputStream.java index 3e89b4046c..5c9804da4b 100644 --- a/libsignal/service/src/main/java/org/whispersystems/signalservice/api/crypto/AttachmentCipherOutputStream.java +++ b/libsignal/service/src/main/java/org/whispersystems/signalservice/api/crypto/AttachmentCipherOutputStream.java @@ -10,6 +10,7 @@ import org.whispersystems.signalservice.internal.util.Util; import java.io.IOException; import java.io.OutputStream; +import java.security.InvalidAlgorithmParameterException; import java.security.InvalidKeyException; import java.security.NoSuchAlgorithmException; @@ -18,6 +19,7 @@ import javax.crypto.Cipher; import javax.crypto.IllegalBlockSizeException; import javax.crypto.Mac; import javax.crypto.NoSuchPaddingException; +import javax.crypto.spec.IvParameterSpec; import javax.crypto.spec.SecretKeySpec; public class AttachmentCipherOutputStream extends DigestingOutputStream { @@ -26,6 +28,7 @@ public class AttachmentCipherOutputStream extends DigestingOutputStream { private final Mac mac; public AttachmentCipherOutputStream(byte[] combinedKeyMaterial, + byte[] iv, OutputStream outputStream) throws IOException { @@ -35,12 +38,17 @@ public class AttachmentCipherOutputStream extends DigestingOutputStream { this.mac = initializeMac(); byte[][] keyParts = Util.split(combinedKeyMaterial, 32, 32); - this.cipher.init(Cipher.ENCRYPT_MODE, new SecretKeySpec(keyParts[0], "AES")); + if (iv == null) { + this.cipher.init(Cipher.ENCRYPT_MODE, new SecretKeySpec(keyParts[0], "AES")); + } else { + this.cipher.init(Cipher.ENCRYPT_MODE, new SecretKeySpec(keyParts[0], "AES"), new IvParameterSpec(iv)); + } + this.mac.init(new SecretKeySpec(keyParts[1], "HmacSHA256")); mac.update(cipher.getIV()); super.write(cipher.getIV()); - } catch (InvalidKeyException e) { + } catch (InvalidKeyException | InvalidAlgorithmParameterException e) { throw new AssertionError(e); } } diff --git a/libsignal/service/src/main/java/org/whispersystems/signalservice/api/crypto/DigestingOutputStream.java b/libsignal/service/src/main/java/org/whispersystems/signalservice/api/crypto/DigestingOutputStream.java index 555c1e855d..51322b0f84 100644 --- a/libsignal/service/src/main/java/org/whispersystems/signalservice/api/crypto/DigestingOutputStream.java +++ b/libsignal/service/src/main/java/org/whispersystems/signalservice/api/crypto/DigestingOutputStream.java @@ -17,7 +17,7 @@ public abstract class DigestingOutputStream extends FilterOutputStream { super(outputStream); try { - this.runningDigest = MessageDigest.getInstance("SHA256"); + this.runningDigest = MessageDigest.getInstance("SHA-256"); } catch (NoSuchAlgorithmException e) { throw new AssertionError(e); } diff --git a/libsignal/service/src/main/java/org/whispersystems/signalservice/api/crypto/SkippingOutputStream.java b/libsignal/service/src/main/java/org/whispersystems/signalservice/api/crypto/SkippingOutputStream.java new file mode 100644 index 0000000000..daf9734fe6 --- /dev/null +++ b/libsignal/service/src/main/java/org/whispersystems/signalservice/api/crypto/SkippingOutputStream.java @@ -0,0 +1,53 @@ +package org.whispersystems.signalservice.api.crypto; + +import java.io.FilterOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.Arrays; + +/** + * SkippingOutputStream will skip a number of bytes being written as specified by toSkip and then + * continue writing all remaining bytes to the wrapped output stream. + */ +public class SkippingOutputStream extends FilterOutputStream { + + private long toSkip; + + public SkippingOutputStream(long toSkip, OutputStream wrapped) { + super(wrapped); + this.toSkip = toSkip; + } + + public void write(int b) throws IOException { + if (toSkip > 0) { + toSkip--; + } else { + out.write(b); + } + } + + public void write(byte[] b) throws IOException { + write(b, 0, b.length); + } + + public void write(byte[] b, int off, int len) throws IOException { + if (b == null) { + throw new NullPointerException(); + } + + if (off < 0 || off > b.length || len < 0 || len + off > b.length || len + off < 0) { + throw new IndexOutOfBoundsException(); + } + + if (toSkip > 0) { + if (len <= toSkip) { + toSkip -= len; + } else { + out.write(b, off + (int) toSkip, len - (int) toSkip); + toSkip = 0; + } + } else { + out.write(b, off, len); + } + } +} diff --git a/libsignal/service/src/main/java/org/whispersystems/signalservice/api/messages/SignalServiceAttachment.java b/libsignal/service/src/main/java/org/whispersystems/signalservice/api/messages/SignalServiceAttachment.java index 61c4385a4c..5038261d62 100644 --- a/libsignal/service/src/main/java/org/whispersystems/signalservice/api/messages/SignalServiceAttachment.java +++ b/libsignal/service/src/main/java/org/whispersystems/signalservice/api/messages/SignalServiceAttachment.java @@ -8,6 +8,7 @@ package org.whispersystems.signalservice.api.messages; import org.whispersystems.libsignal.util.guava.Optional; import org.whispersystems.signalservice.internal.push.http.CancelationSignal; +import org.whispersystems.signalservice.internal.push.http.ResumableUploadSpec; import java.io.InputStream; @@ -40,18 +41,19 @@ public abstract class SignalServiceAttachment { public static class Builder { - private InputStream inputStream; - private String contentType; - private String fileName; - private long length; - private ProgressListener listener; - private CancelationSignal cancelationSignal; - private boolean voiceNote; - private int width; - private int height; - private String caption; - private String blurHash; - private long uploadTimestamp; + private InputStream inputStream; + private String contentType; + private String fileName; + private long length; + private ProgressListener listener; + private CancelationSignal cancelationSignal; + private boolean voiceNote; + private int width; + private int height; + private String caption; + private String blurHash; + private long uploadTimestamp; + private ResumableUploadSpec resumableUploadSpec; private Builder() {} @@ -115,6 +117,11 @@ public abstract class SignalServiceAttachment { return this; } + public Builder withResumableUploadSpec(ResumableUploadSpec resumableUploadSpec) { + this.resumableUploadSpec = resumableUploadSpec; + return this; + } + public SignalServiceAttachmentStream build() { if (inputStream == null) throw new IllegalArgumentException("Must specify stream!"); if (contentType == null) throw new IllegalArgumentException("No content type specified!"); @@ -132,7 +139,8 @@ public abstract class SignalServiceAttachment { Optional.fromNullable(caption), Optional.fromNullable(blurHash), listener, - cancelationSignal); + cancelationSignal, + Optional.fromNullable(resumableUploadSpec)); } } diff --git a/libsignal/service/src/main/java/org/whispersystems/signalservice/api/messages/SignalServiceAttachmentStream.java b/libsignal/service/src/main/java/org/whispersystems/signalservice/api/messages/SignalServiceAttachmentStream.java index 64234a1280..5a959dcea7 100644 --- a/libsignal/service/src/main/java/org/whispersystems/signalservice/api/messages/SignalServiceAttachmentStream.java +++ b/libsignal/service/src/main/java/org/whispersystems/signalservice/api/messages/SignalServiceAttachmentStream.java @@ -8,6 +8,7 @@ package org.whispersystems.signalservice.api.messages; import org.whispersystems.libsignal.util.guava.Optional; import org.whispersystems.signalservice.internal.push.http.CancelationSignal; +import org.whispersystems.signalservice.internal.push.http.ResumableUploadSpec; import java.io.InputStream; @@ -16,21 +17,22 @@ import java.io.InputStream; */ public class SignalServiceAttachmentStream extends SignalServiceAttachment { - private final InputStream inputStream; - private final long length; - private final Optional fileName; - private final ProgressListener listener; - private final CancelationSignal cancelationSignal; - private final Optional preview; - private final boolean voiceNote; - private final int width; - private final int height; - private final long uploadTimestamp; - private final Optional caption; - private final Optional blurHash; + private final InputStream inputStream; + private final long length; + private final Optional fileName; + private final ProgressListener listener; + private final CancelationSignal cancelationSignal; + private final Optional preview; + private final boolean voiceNote; + private final int width; + private final int height; + private final long uploadTimestamp; + private final Optional caption; + private final Optional blurHash; + private final Optional resumableUploadSpec; public SignalServiceAttachmentStream(InputStream inputStream, String contentType, long length, Optional fileName, boolean voiceNote, ProgressListener listener, CancelationSignal cancelationSignal) { - this(inputStream, contentType, length, fileName, voiceNote, Optional.absent(), 0, 0, System.currentTimeMillis(), Optional.absent(), Optional.absent(), listener, cancelationSignal); + this(inputStream, contentType, length, fileName, voiceNote, Optional.absent(), 0, 0, System.currentTimeMillis(), Optional.absent(), Optional.absent(), listener, cancelationSignal, Optional.absent()); } public SignalServiceAttachmentStream(InputStream inputStream, @@ -45,21 +47,23 @@ public class SignalServiceAttachmentStream extends SignalServiceAttachment { Optional caption, Optional blurHash, ProgressListener listener, - CancelationSignal cancelationSignal) + CancelationSignal cancelationSignal, + Optional resumableUploadSpec) { super(contentType); - this.inputStream = inputStream; - this.length = length; - this.fileName = fileName; - this.listener = listener; - this.voiceNote = voiceNote; - this.preview = preview; - this.width = width; - this.height = height; - this.uploadTimestamp = uploadTimestamp; - this.caption = caption; - this.blurHash = blurHash; - this.cancelationSignal = cancelationSignal; + this.inputStream = inputStream; + this.length = length; + this.fileName = fileName; + this.listener = listener; + this.voiceNote = voiceNote; + this.preview = preview; + this.width = width; + this.height = height; + this.uploadTimestamp = uploadTimestamp; + this.caption = caption; + this.blurHash = blurHash; + this.cancelationSignal = cancelationSignal; + this.resumableUploadSpec = resumableUploadSpec; } @Override @@ -119,4 +123,8 @@ public class SignalServiceAttachmentStream extends SignalServiceAttachment { public long getUploadTimestamp() { return uploadTimestamp; } + + public Optional getResumableUploadSpec() { + return resumableUploadSpec; + } } diff --git a/libsignal/service/src/main/java/org/whispersystems/signalservice/api/push/exceptions/ResumeLocationInvalidException.java b/libsignal/service/src/main/java/org/whispersystems/signalservice/api/push/exceptions/ResumeLocationInvalidException.java new file mode 100644 index 0000000000..9445270c70 --- /dev/null +++ b/libsignal/service/src/main/java/org/whispersystems/signalservice/api/push/exceptions/ResumeLocationInvalidException.java @@ -0,0 +1,14 @@ +package org.whispersystems.signalservice.api.push.exceptions; + +import java.io.IOException; + +public class ResumeLocationInvalidException extends IOException { + + public ResumeLocationInvalidException() { + super(); + } + + public ResumeLocationInvalidException(String s) { + super(s); + } +} diff --git a/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/push/NowhereBufferedSink.java b/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/push/NowhereBufferedSink.java new file mode 100644 index 0000000000..a566c9827b --- /dev/null +++ b/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/push/NowhereBufferedSink.java @@ -0,0 +1,176 @@ +package org.whispersystems.signalservice.internal.push; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; + +import okio.Buffer; +import okio.BufferedSink; +import okio.ByteString; +import okio.Source; +import okio.Timeout; + +/** + * NowhereBufferedSync allows a programmer to write out data into the void. This has no memory + * implications, as we don't actually store bytes. Supports getting an OutputStream, which also + * just writes into the void. + */ +public class NowhereBufferedSink implements BufferedSink { + @Override + public Buffer buffer() { + return null; + } + + @Override + public BufferedSink write(ByteString byteString) throws IOException { + return this; + } + + @Override + public BufferedSink write(byte[] source) throws IOException { + return this; + } + + @Override + public BufferedSink write(byte[] source, int offset, int byteCount) throws IOException { + return this; + } + + @Override + public long writeAll(Source source) throws IOException { + return 0; + } + + @Override + public BufferedSink write(Source source, long byteCount) throws IOException { + return this; + } + + @Override + public BufferedSink writeUtf8(String string) throws IOException { + return this; + } + + @Override + public BufferedSink writeUtf8(String string, int beginIndex, int endIndex) throws IOException { + return this; + } + + @Override + public BufferedSink writeUtf8CodePoint(int codePoint) throws IOException { + return this; + } + + @Override + public BufferedSink writeString(String string, Charset charset) throws IOException { + return this; + } + + @Override + public BufferedSink writeString(String string, int beginIndex, int endIndex, Charset charset) throws IOException { + return this; + } + + @Override + public BufferedSink writeByte(int b) throws IOException { + return this; + } + + @Override + public BufferedSink writeShort(int s) throws IOException { + return this; + } + + @Override + public BufferedSink writeShortLe(int s) throws IOException { + return this; + } + + @Override + public BufferedSink writeInt(int i) throws IOException { + return this; + } + + @Override + public BufferedSink writeIntLe(int i) throws IOException { + return this; + } + + @Override + public BufferedSink writeLong(long v) throws IOException { + return this; + } + + @Override + public BufferedSink writeLongLe(long v) throws IOException { + return this; + } + + @Override + public BufferedSink writeDecimalLong(long v) throws IOException { + return this; + } + + @Override + public BufferedSink writeHexadecimalUnsignedLong(long v) throws IOException { + return this; + } + + @Override + public void write(Buffer source, long byteCount) throws IOException { + + } + + @Override + public void flush() throws IOException { + + } + + @Override + public Timeout timeout() { + return null; + } + + @Override + public void close() throws IOException { + + } + + @Override + public BufferedSink emit() throws IOException { + return this; + } + + @Override + public BufferedSink emitCompleteSegments() throws IOException { + return this; + } + + @Override + public OutputStream outputStream() { + return new OutputStream() { + @Override + public void write(int i) throws IOException { + } + + @Override + public void write(byte[] bytes) throws IOException { + } + + @Override + public void write(byte[] bytes, int i, int i1) throws IOException { + } + }; + } + + @Override + public int write(ByteBuffer byteBuffer) throws IOException { + return 0; + } + + @Override + public boolean isOpen() { + return false; + } +} diff --git a/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/push/PushAttachmentData.java b/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/push/PushAttachmentData.java index 42945a4e20..536af853e3 100644 --- a/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/push/PushAttachmentData.java +++ b/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/push/PushAttachmentData.java @@ -9,28 +9,32 @@ package org.whispersystems.signalservice.internal.push; import org.whispersystems.signalservice.api.messages.SignalServiceAttachment.ProgressListener; import org.whispersystems.signalservice.internal.push.http.CancelationSignal; import org.whispersystems.signalservice.internal.push.http.OutputStreamFactory; +import org.whispersystems.signalservice.internal.push.http.ResumableUploadSpec; import java.io.InputStream; public class PushAttachmentData { - private final String contentType; - private final InputStream data; - private final long dataSize; - private final OutputStreamFactory outputStreamFactory; - private final ProgressListener listener; - private final CancelationSignal cancelationSignal; + private final String contentType; + private final InputStream data; + private final long dataSize; + private final OutputStreamFactory outputStreamFactory; + private final ProgressListener listener; + private final CancelationSignal cancelationSignal; + private final ResumableUploadSpec resumableUploadSpec; public PushAttachmentData(String contentType, InputStream data, long dataSize, - OutputStreamFactory outputStreamFactory, ProgressListener listener, - CancelationSignal cancelationSignal) + OutputStreamFactory outputStreamFactory, + ProgressListener listener, CancelationSignal cancelationSignal, + ResumableUploadSpec resumableUploadSpec) { - this.contentType = contentType; - this.data = data; - this.dataSize = dataSize; - this.outputStreamFactory = outputStreamFactory; - this.listener = listener; - this.cancelationSignal = cancelationSignal; + this.contentType = contentType; + this.data = data; + this.dataSize = dataSize; + this.outputStreamFactory = outputStreamFactory; + this.resumableUploadSpec = resumableUploadSpec; + this.listener = listener; + this.cancelationSignal = cancelationSignal; } public String getContentType() { @@ -56,4 +60,9 @@ public class PushAttachmentData { public CancelationSignal getCancelationSignal() { return cancelationSignal; } + + public ResumableUploadSpec getResumableUploadSpec() { + return resumableUploadSpec; + } + } diff --git a/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/push/PushServiceSocket.java b/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/push/PushServiceSocket.java index eefddf6037..19e98a14eb 100644 --- a/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/push/PushServiceSocket.java +++ b/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/push/PushServiceSocket.java @@ -56,6 +56,7 @@ import org.whispersystems.signalservice.api.push.exceptions.NotFoundException; import org.whispersystems.signalservice.api.push.exceptions.PushNetworkException; import org.whispersystems.signalservice.api.push.exceptions.RateLimitException; import org.whispersystems.signalservice.api.push.exceptions.RemoteAttestationResponseExpiredException; +import org.whispersystems.signalservice.api.push.exceptions.ResumeLocationInvalidException; import org.whispersystems.signalservice.api.push.exceptions.UnregisteredUserException; import org.whispersystems.signalservice.api.push.exceptions.UsernameMalformedException; import org.whispersystems.signalservice.api.push.exceptions.UsernameTakenException; @@ -77,6 +78,7 @@ import org.whispersystems.signalservice.internal.push.http.CancelationSignal; import org.whispersystems.signalservice.internal.push.http.DigestingRequestBody; import org.whispersystems.signalservice.internal.push.http.NoCipherOutputStreamFactory; import org.whispersystems.signalservice.internal.push.http.OutputStreamFactory; +import org.whispersystems.signalservice.internal.push.http.ResumableUploadSpec; import org.whispersystems.signalservice.internal.storage.protos.ReadOperation; import org.whispersystems.signalservice.internal.storage.protos.StorageItems; import org.whispersystems.signalservice.internal.storage.protos.StorageManifest; @@ -191,6 +193,8 @@ public class PushServiceSocket { private static final Map NO_HEADERS = Collections.emptyMap(); private static final ResponseCodeHandler NO_HANDLER = new EmptyResponseCodeHandler(); + private static final long CDN2_RESUMABLE_LINK_LIFETIME_MILLIS = TimeUnit.DAYS.toMillis(7); + private long soTimeoutMillis = TimeUnit.SECONDS.toMillis(30); private final Set connections = new HashSet<>(); @@ -929,9 +933,22 @@ public class PushServiceSocket { return new Pair<>(id, digest); } - public byte[] uploadAttachment(PushAttachmentData attachment, AttachmentV3UploadAttributes uploadAttributes) throws IOException { - String resumableUploadUrl = getResumableUploadUrl(uploadAttributes.getSignedUploadLocation(), uploadAttributes.getHeaders()); - return uploadToCdn2(resumableUploadUrl, + public ResumableUploadSpec getResumableUploadSpec(AttachmentV3UploadAttributes uploadAttributes) throws IOException { + return new ResumableUploadSpec(Util.getSecretBytes(64), + Util.getSecretBytes(16), + uploadAttributes.getKey(), + uploadAttributes.getCdn(), + getResumableUploadUrl(uploadAttributes.getSignedUploadLocation(), uploadAttributes.getHeaders()), + System.currentTimeMillis() + CDN2_RESUMABLE_LINK_LIFETIME_MILLIS); + } + + public byte[] uploadAttachment(PushAttachmentData attachment) throws IOException { + + if (attachment.getResumableUploadSpec() == null || attachment.getResumableUploadSpec().getExpirationTimestamp() < System.currentTimeMillis()) { + throw new ResumeLocationInvalidException(); + } + + return uploadToCdn2(attachment.getResumableUploadSpec().getResumeLocation(), attachment.getData(), "application/octet-stream", attachment.getDataSize(), @@ -1036,7 +1053,7 @@ public class PushServiceSocket { .readTimeout(soTimeoutMillis, TimeUnit.MILLISECONDS) .build(); - DigestingRequestBody file = new DigestingRequestBody(data, outputStreamFactory, contentType, length, progressListener, cancelationSignal); + DigestingRequestBody file = new DigestingRequestBody(data, outputStreamFactory, contentType, length, progressListener, cancelationSignal, 0); RequestBody requestBody = new MultipartBody.Builder() .setType(MultipartBody.FORM) @@ -1152,9 +1169,20 @@ public class PushServiceSocket { .readTimeout(soTimeoutMillis, TimeUnit.MILLISECONDS) .build(); - DigestingRequestBody file = new DigestingRequestBody(data, outputStreamFactory, contentType, length, progressListener, cancelationSignal); + ResumeInfo resumeInfo = getResumeInfo(resumableUrl, length); + DigestingRequestBody file = new DigestingRequestBody(data, outputStreamFactory, contentType, length, progressListener, cancelationSignal, resumeInfo.contentStart); + + if (resumeInfo.contentStart == length) { + Log.w(TAG, "Resume start point == content length"); + try (NowhereBufferedSink buffer = new NowhereBufferedSink()) { + file.writeTo(buffer); + } + return file.getTransmittedDigest(); + } + Request.Builder request = new Request.Builder().url(resumableUrl) - .put(file); + .put(file) + .addHeader("Content-Range", resumeInfo.contentRange); if (connectionHolder.getHostHeader().isPresent()) { request.header("host", connectionHolder.getHostHeader().get()); @@ -1184,6 +1212,67 @@ public class PushServiceSocket { } } + private ResumeInfo getResumeInfo(String resumableUrl, long contentLength) throws IOException { + ConnectionHolder connectionHolder = getRandom(cdnClientsMap.get(2), random); + OkHttpClient okHttpClient = connectionHolder.getClient() + .newBuilder() + .connectTimeout(soTimeoutMillis, TimeUnit.MILLISECONDS) + .readTimeout(soTimeoutMillis, TimeUnit.MILLISECONDS) + .build(); + + final long offset; + final String contentRange; + + Request.Builder request = new Request.Builder().url(resumableUrl) + .put(RequestBody.create(null, "")) + .addHeader("Content-Range", String.format(Locale.US, "bytes */%d", contentLength)); + + if (connectionHolder.getHostHeader().isPresent()) { + request.header("host", connectionHolder.getHostHeader().get()); + } + + Call call = okHttpClient.newCall(request.build()); + + synchronized (connections) { + connections.add(call); + } + + try { + Response response; + + try { + response = call.execute(); + } catch (IOException e) { + throw new PushNetworkException(e); + } + + if (response.isSuccessful()) { + offset = contentLength; + contentRange = null; + } else if (response.code() == 308) { + String rangeCompleted = response.header("Range"); + + if (rangeCompleted == null) { + offset = 0; + } else { + offset = Long.parseLong(rangeCompleted.split("-")[1]) + 1; + } + + contentRange = String.format(Locale.US, "bytes %d-%d/%d", offset, contentLength - 1, contentLength); + } else if (response.code() == 404) { + throw new ResumeLocationInvalidException(); + } else { + throw new NonSuccessfulResponseCodeException("Response: " + response); + } + } finally { + synchronized (connections) { + connections.remove(call); + } + } + + return new ResumeInfo(contentRange, offset); + } + private String makeServiceRequest(String urlFragment, String method, String jsonBody) throws NonSuccessfulResponseCodeException, PushNetworkException { @@ -1806,4 +1895,13 @@ public class PushServiceSocket { } } + private final class ResumeInfo { + private final String contentRange; + private final long contentStart; + + private ResumeInfo(String contentRange, long offset) { + this.contentRange = contentRange; + this.contentStart = offset; + } + } } diff --git a/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/push/http/AttachmentCipherOutputStreamFactory.java b/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/push/http/AttachmentCipherOutputStreamFactory.java index b807cc3878..f7fda59176 100644 --- a/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/push/http/AttachmentCipherOutputStreamFactory.java +++ b/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/push/http/AttachmentCipherOutputStreamFactory.java @@ -10,14 +10,16 @@ import java.io.OutputStream; public class AttachmentCipherOutputStreamFactory implements OutputStreamFactory { private final byte[] key; + private final byte[] iv; - public AttachmentCipherOutputStreamFactory(byte[] key) { + public AttachmentCipherOutputStreamFactory(byte[] key, byte[] iv) { this.key = key; + this.iv = iv; } @Override public DigestingOutputStream createFor(OutputStream wrap) throws IOException { - return new AttachmentCipherOutputStream(key, wrap); + return new AttachmentCipherOutputStream(key, iv, wrap); } } diff --git a/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/push/http/DigestingRequestBody.java b/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/push/http/DigestingRequestBody.java index 652234f6b3..9de331fcb7 100644 --- a/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/push/http/DigestingRequestBody.java +++ b/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/push/http/DigestingRequestBody.java @@ -1,7 +1,9 @@ package org.whispersystems.signalservice.internal.push.http; +import org.whispersystems.libsignal.util.guava.Preconditions; import org.whispersystems.signalservice.api.crypto.DigestingOutputStream; +import org.whispersystems.signalservice.api.crypto.SkippingOutputStream; import org.whispersystems.signalservice.api.messages.SignalServiceAttachment.ProgressListener; import java.io.IOException; @@ -19,6 +21,7 @@ public class DigestingRequestBody extends RequestBody { private final long contentLength; private final ProgressListener progressListener; private final CancelationSignal cancelationSignal; + private final long contentStart; private byte[] digest; @@ -26,14 +29,19 @@ public class DigestingRequestBody extends RequestBody { OutputStreamFactory outputStreamFactory, String contentType, long contentLength, ProgressListener progressListener, - CancelationSignal cancelationSignal) + CancelationSignal cancelationSignal, + long contentStart) { + Preconditions.checkArgument(contentLength >= contentStart); + Preconditions.checkArgument(contentStart >= 0); + this.inputStream = inputStream; this.outputStreamFactory = outputStreamFactory; this.contentType = contentType; this.contentLength = contentLength; this.progressListener = progressListener; this.cancelationSignal = cancelationSignal; + this.contentStart = contentStart; } @Override @@ -43,7 +51,7 @@ public class DigestingRequestBody extends RequestBody { @Override public void writeTo(BufferedSink sink) throws IOException { - DigestingOutputStream outputStream = outputStreamFactory.createFor(sink.outputStream()); + DigestingOutputStream outputStream = outputStreamFactory.createFor(new SkippingOutputStream(contentStart, sink.outputStream())); byte[] buffer = new byte[8192]; int read; @@ -68,7 +76,7 @@ public class DigestingRequestBody extends RequestBody { @Override public long contentLength() { - if (contentLength > 0) return contentLength; + if (contentLength > 0) return contentLength - contentStart; else return -1; } diff --git a/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/push/http/ResumableUploadSpec.java b/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/push/http/ResumableUploadSpec.java new file mode 100644 index 0000000000..5a0f221d10 --- /dev/null +++ b/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/push/http/ResumableUploadSpec.java @@ -0,0 +1,93 @@ +package org.whispersystems.signalservice.internal.push.http; + +import com.google.protobuf.ByteString; + +import org.signal.protos.resumableuploads.ResumableUploads; +import org.whispersystems.libsignal.util.guava.Optional; +import org.whispersystems.libsignal.util.guava.Preconditions; +import org.whispersystems.signalservice.api.push.exceptions.ResumeLocationInvalidException; +import org.whispersystems.util.Base64; + +import java.io.IOException; + +public final class ResumableUploadSpec { + + private final byte[] secretKey; + private final byte[] iv; + + private final String cdnKey; + private final Integer cdnNumber; + private final String resumeLocation; + private final Long expirationTimestamp; + + public ResumableUploadSpec(byte[] secretKey, + byte[] iv, + String cdnKey, + int cdnNumber, + String resumeLocation, + long expirationTimestamp) + { + this.secretKey = secretKey; + this.iv = iv; + this.cdnKey = cdnKey; + this.cdnNumber = cdnNumber; + this.resumeLocation = resumeLocation; + this.expirationTimestamp = expirationTimestamp; + } + + public byte[] getSecretKey() { + return secretKey; + } + + public byte[] getIV() { + return iv; + } + + public String getCdnKey() { + return cdnKey; + } + + public Integer getCdnNumber() { + return cdnNumber; + } + + public String getResumeLocation() { + return resumeLocation; + } + + public Long getExpirationTimestamp() { + return expirationTimestamp; + } + + public String serialize() { + ResumableUploads.ResumableUpload.Builder builder = ResumableUploads.ResumableUpload.newBuilder() + .setSecretKey(ByteString.copyFrom(getSecretKey())) + .setIv(ByteString.copyFrom(getIV())) + .setTimeout(getExpirationTimestamp()) + .setCdnNumber(getCdnNumber()) + .setCdnKey(getCdnKey()) + .setLocation(getResumeLocation()) + .setTimeout(getExpirationTimestamp()); + + return Base64.encodeBytes(builder.build().toByteArray()); + } + + public static ResumableUploadSpec deserialize(String serializedSpec) throws ResumeLocationInvalidException { + if (serializedSpec == null) return null; + + try { + ResumableUploads.ResumableUpload resumableUpload = ResumableUploads.ResumableUpload.parseFrom(ByteString.copyFrom(Base64.decode(serializedSpec))); + + return new ResumableUploadSpec( + resumableUpload.getSecretKey().toByteArray(), + resumableUpload.getIv().toByteArray(), + resumableUpload.getCdnKey(), + resumableUpload.getCdnNumber(), + resumableUpload.getLocation(), + resumableUpload.getTimeout() + ); + } catch (IOException e) { + throw new ResumeLocationInvalidException(); + } + } +} diff --git a/libsignal/service/src/main/proto/ResumableUploads.proto b/libsignal/service/src/main/proto/ResumableUploads.proto new file mode 100644 index 0000000000..530c7b70e6 --- /dev/null +++ b/libsignal/service/src/main/proto/ResumableUploads.proto @@ -0,0 +1,17 @@ +/** + * Copyright (C) 2020 Open Whisper Systems + * + * Licensed according to the LICENSE file in this repository. + */ +syntax = "proto3"; + +option java_package = "org.signal.protos.resumableuploads"; + +message ResumableUpload { + bytes secretKey = 1; + bytes iv = 2; + string cdnKey = 3; + uint32 cdnNumber = 4; + string location = 5; + uint64 timeout = 6; +} \ No newline at end of file diff --git a/libsignal/service/src/test/java/org/whispersystems/signalservice/api/crypto/AttachmentCipherTest.java b/libsignal/service/src/test/java/org/whispersystems/signalservice/api/crypto/AttachmentCipherTest.java index 96f922372f..8dafeaf879 100644 --- a/libsignal/service/src/test/java/org/whispersystems/signalservice/api/crypto/AttachmentCipherTest.java +++ b/libsignal/service/src/test/java/org/whispersystems/signalservice/api/crypto/AttachmentCipherTest.java @@ -203,7 +203,7 @@ public class AttachmentCipherTest extends TestCase { private static EncryptResult encryptData(byte[] data, byte[] keyMaterial) throws IOException { ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - AttachmentCipherOutputStream encryptStream = new AttachmentCipherOutputStream(keyMaterial, outputStream); + AttachmentCipherOutputStream encryptStream = new AttachmentCipherOutputStream(keyMaterial, null, outputStream); encryptStream.write(data); encryptStream.flush(); diff --git a/libsignal/service/src/test/java/org/whispersystems/signalservice/api/crypto/SkippingOutputStreamTest.java b/libsignal/service/src/test/java/org/whispersystems/signalservice/api/crypto/SkippingOutputStreamTest.java new file mode 100644 index 0000000000..9f831744b2 --- /dev/null +++ b/libsignal/service/src/test/java/org/whispersystems/signalservice/api/crypto/SkippingOutputStreamTest.java @@ -0,0 +1,136 @@ +package org.whispersystems.signalservice.api.crypto; + +import org.junit.Test; + +import java.io.ByteArrayOutputStream; + +import static org.junit.Assert.*; + +public class SkippingOutputStreamTest { + + private final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + + @Test + public void givenZeroToSkip_whenIWriteInt_thenIGetIntInOutput() throws Exception { + // GIVEN + SkippingOutputStream testSubject = new SkippingOutputStream(0, outputStream); + + // WHEN + testSubject.write(0); + + // THEN + assertEquals(1, outputStream.toByteArray().length); + assertEquals(0, outputStream.toByteArray()[0]); + } + + @Test + public void givenOneToSkip_whenIWriteIntTwice_thenIGetSecondIntInOutput() throws Exception { + // GIVEN + SkippingOutputStream testSubject = new SkippingOutputStream(1, outputStream); + + // WHEN + testSubject.write(0); + testSubject.write(1); + + // THEN + assertEquals(1, outputStream.toByteArray().length); + assertEquals(1, outputStream.toByteArray()[0]); + } + + @Test + public void givenZeroToSkip_whenIWriteArray_thenIGetArrayInOutput() throws Exception { + // GIVEN + byte[] expected = new byte[]{1, 2, 3, 4, 5}; + SkippingOutputStream testSubject = new SkippingOutputStream(0, outputStream); + + // WHEN + testSubject.write(expected); + + // THEN + assertEquals(expected.length, outputStream.toByteArray().length); + assertArrayEquals(expected, outputStream.toByteArray()); + } + + @Test + public void givenNonZeroToSkip_whenIWriteArray_thenIGetEndOfArrayInOutput() throws Exception { + // GIVEN + byte[] expected = new byte[]{1, 2, 3, 4, 5}; + SkippingOutputStream testSubject = new SkippingOutputStream(3, outputStream); + + // WHEN + testSubject.write(expected); + + // THEN + assertEquals(2, outputStream.toByteArray().length); + assertArrayEquals(new byte[]{4, 5}, outputStream.toByteArray()); + } + + @Test + public void givenSkipGreaterThanByteArray_whenIWriteArray_thenIGetNoOutput() throws Exception { + // GIVEN + byte[] array = new byte[]{1, 2, 3, 4, 5}; + SkippingOutputStream testSubject = new SkippingOutputStream(10, outputStream); + + // WHEN + testSubject.write(array); + + // THEN + assertEquals(0, outputStream.toByteArray().length); + } + + @Test + public void givenZeroToSkip_whenIWriteArrayRange_thenIGetArrayRangeInOutput() throws Exception { + // GIVEN + byte[] expected = new byte[]{1, 2, 3, 4, 5}; + SkippingOutputStream testSubject = new SkippingOutputStream(0, outputStream); + + // WHEN + testSubject.write(expected, 1, 3); + + // THEN + assertEquals(3, outputStream.toByteArray().length); + assertArrayEquals(new byte[]{2, 3, 4}, outputStream.toByteArray()); + } + + @Test + public void givenNonZeroToSkip_whenIWriteArrayRange_thenIGetEndOfArrayRangeInOutput() throws Exception { + // GIVEN + byte[] expected = new byte[]{1, 2, 3, 4, 5}; + SkippingOutputStream testSubject = new SkippingOutputStream(1, outputStream); + + // WHEN + testSubject.write(expected, 3, 2); + + // THEN + assertEquals(1, outputStream.toByteArray().length); + assertArrayEquals(new byte[]{5}, outputStream.toByteArray()); + } + + @Test + public void givenSkipGreaterThanByteArrayRange_whenIWriteArrayRange_thenIGetNoOutput() throws Exception { + // GIVEN + byte[] array = new byte[]{1, 2, 3, 4, 5}; + SkippingOutputStream testSubject = new SkippingOutputStream(10, outputStream); + + // WHEN + testSubject.write(array, 3, 2); + + // THEN + assertEquals(0, outputStream.toByteArray().length); + } + + @Test + public void givenSkipGreaterThanByteArrayRange_whenIWriteArrayRangeTwice_thenIGetExpectedOutput() throws Exception { + // GIVEN + byte[] array = new byte[]{1, 2, 3, 4, 5}; + SkippingOutputStream testSubject = new SkippingOutputStream(3, outputStream); + + // WHEN + testSubject.write(array, 3, 2); + testSubject.write(array, 3, 2); + + // THEN + assertEquals(1, outputStream.toByteArray().length); + assertArrayEquals(new byte[]{5}, outputStream.toByteArray()); + } +} \ No newline at end of file diff --git a/libsignal/service/src/test/java/org/whispersystems/signalservice/internal/push/http/DigestingRequestBodyTest.java b/libsignal/service/src/test/java/org/whispersystems/signalservice/internal/push/http/DigestingRequestBodyTest.java new file mode 100644 index 0000000000..189e4e7215 --- /dev/null +++ b/libsignal/service/src/test/java/org/whispersystems/signalservice/internal/push/http/DigestingRequestBodyTest.java @@ -0,0 +1,73 @@ +package org.whispersystems.signalservice.internal.push.http; + +import org.junit.Test; +import org.whispersystems.signalservice.api.crypto.AttachmentCipherOutputStream; +import org.whispersystems.signalservice.internal.util.Util; + +import java.io.ByteArrayInputStream; + +import okio.Buffer; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +public class DigestingRequestBodyTest { + + private static int CONTENT_LENGTH = 70000; + private static int TOTAL_LENGTH = (int) AttachmentCipherOutputStream.getCiphertextLength(CONTENT_LENGTH); + + private final byte[] attachmentKey = Util.getSecretBytes(64); + private final byte[] attachmentIV = Util.getSecretBytes(16); + private final byte[] input = Util.getSecretBytes(CONTENT_LENGTH); + + private final OutputStreamFactory outputStreamFactory = new AttachmentCipherOutputStreamFactory(attachmentKey, attachmentIV); + + @Test + public void givenSameKeyAndIV_whenIWriteToBuffer_thenIExpectSameTransmittedDigest() throws Exception { + DigestingRequestBody fromStart = getBody(0); + DigestingRequestBody fromMiddle = getBody(CONTENT_LENGTH / 2); + + try (Buffer buffer = new Buffer()) { + fromStart.writeTo(buffer); + } + + try (Buffer buffer = new Buffer()) { + fromMiddle.writeTo(buffer); + } + + assertArrayEquals(fromStart.getTransmittedDigest(), fromMiddle.getTransmittedDigest()); + } + + @Test + public void givenSameKeyAndIV_whenIWriteToBuffer_thenIExpectSameContents() throws Exception { + DigestingRequestBody fromStart = getBody(0); + DigestingRequestBody fromMiddle = getBody(CONTENT_LENGTH / 2); + + byte[] cipher1; + + try (Buffer buffer = new Buffer()) { + fromStart.writeTo(buffer); + + cipher1 = buffer.readByteArray(); + } + + byte[] cipher2; + + try (Buffer buffer = new Buffer()) { + fromMiddle.writeTo(buffer); + + cipher2 = buffer.readByteArray(); + } + + assertEquals(cipher1.length, TOTAL_LENGTH); + assertEquals(cipher2.length, TOTAL_LENGTH - (CONTENT_LENGTH / 2)); + + for (int i = 0; i < cipher2.length; i++) { + assertEquals(cipher2[i], cipher1[i + (CONTENT_LENGTH / 2)]); + } + } + + private DigestingRequestBody getBody(long contentStart) { + return new DigestingRequestBody(new ByteArrayInputStream(input), outputStreamFactory, "application/octet", CONTENT_LENGTH, (a, b) -> {}, () -> false, contentStart); + } +} \ No newline at end of file