Update background polling for SSKs

This commit is contained in:
nielsandriesse 2020-09-16 13:42:18 +10:00
parent 71001c4b5c
commit 2ee8bd8959
11 changed files with 162 additions and 84 deletions

View File

@ -680,7 +680,7 @@
</intent-filter>
</receiver>
<!-- Session -->
<receiver android:name="org.thoughtcrime.securesms.loki.api.BackgroundPollWorker">
<receiver android:name="org.thoughtcrime.securesms.loki.api.BackgroundPollListener" >
<intent-filter>
<action android:name="android.intent.action.BOOT_COMPLETED" />
</intent-filter>
@ -694,7 +694,6 @@
<service
android:name="org.thoughtcrime.securesms.jobmanager.KeepAliveService"
android:enabled="@bool/enable_alarm_manager" />
<receiver
android:name="org.thoughtcrime.securesms.jobmanager.AlarmManagerScheduler$RetryReceiver"
android:enabled="@bool/enable_alarm_manager" /> <!-- Probably don't need this one -->

View File

@ -16,16 +16,17 @@
*/
package org.thoughtcrime.securesms;
import androidx.lifecycle.DefaultLifecycleObserver;
import androidx.lifecycle.LifecycleOwner;
import androidx.lifecycle.ProcessLifecycleOwner;
import android.content.Context;
import android.content.Intent;
import android.os.AsyncTask;
import android.os.Build;
import android.os.Handler;
import androidx.annotation.NonNull;
import androidx.annotation.Nullable;
import androidx.lifecycle.DefaultLifecycleObserver;
import androidx.lifecycle.LifecycleOwner;
import androidx.lifecycle.ProcessLifecycleOwner;
import androidx.multidex.MultiDexApplication;
import com.google.firebase.iid.FirebaseInstanceId;
@ -60,7 +61,7 @@ import org.thoughtcrime.securesms.logging.Log;
import org.thoughtcrime.securesms.logging.PersistentLogger;
import org.thoughtcrime.securesms.logging.UncaughtExceptionLogger;
import org.thoughtcrime.securesms.loki.activities.HomeActivity;
import org.thoughtcrime.securesms.loki.api.BackgroundPollWorker;
import org.thoughtcrime.securesms.loki.api.BackgroundPollListener;
import org.thoughtcrime.securesms.loki.api.ClosedGroupPoller;
import org.thoughtcrime.securesms.loki.api.LokiPushNotificationManager;
import org.thoughtcrime.securesms.loki.api.PublicChatManager;
@ -115,10 +116,10 @@ import org.whispersystems.signalservice.loki.protocol.closedgroups.SharedSenderK
import org.whispersystems.signalservice.loki.protocol.mentions.MentionsManager;
import org.whispersystems.signalservice.loki.protocol.meta.SessionMetaProtocol;
import org.whispersystems.signalservice.loki.protocol.meta.TTLUtilities;
import org.whispersystems.signalservice.loki.protocol.shelved.multidevice.DeviceLink;
import org.whispersystems.signalservice.loki.protocol.shelved.multidevice.MultiDeviceProtocol;
import org.whispersystems.signalservice.loki.protocol.sessionmanagement.SessionManagementProtocol;
import org.whispersystems.signalservice.loki.protocol.sessionmanagement.SessionManagementProtocolDelegate;
import org.whispersystems.signalservice.loki.protocol.shelved.multidevice.DeviceLink;
import org.whispersystems.signalservice.loki.protocol.shelved.multidevice.MultiDeviceProtocol;
import org.whispersystems.signalservice.loki.protocol.shelved.syncmessages.SyncMessagesProtocol;
import java.io.File;
@ -383,7 +384,7 @@ public class ApplicationContext extends MultiDexApplication implements Dependenc
RotateSignedPreKeyListener.schedule(this);
LocalBackupListener.schedule(this);
RotateSenderCertificateListener.schedule(this);
BackgroundPollWorker.schedule(this); // Loki
BackgroundPollListener.schedule(this); // Loki
if (BuildConfig.PLAY_STORE_DISABLED) {
UpdateApkRefreshListener.schedule(this);
@ -513,9 +514,9 @@ public class ApplicationContext extends MultiDexApplication implements Dependenc
Context context = this;
SwarmAPI.Companion.configureIfNeeded(apiDB);
SnodeAPI.Companion.configureIfNeeded(userPublicKey, apiDB, broadcaster);
poller = new Poller(userPublicKey, apiDB, protos -> {
for (SignalServiceProtos.Envelope proto : protos) {
new PushContentReceiveJob(context).processEnvelope(new SignalServiceEnvelope(proto), false);
poller = new Poller(userPublicKey, apiDB, envelopes -> {
for (SignalServiceProtos.Envelope envelope : envelopes) {
new PushContentReceiveJob(context).processEnvelope(new SignalServiceEnvelope(envelope), false);
}
return Unit.INSTANCE;
});

View File

@ -44,6 +44,7 @@ import org.thoughtcrime.securesms.jobs.SmsSentJob;
import org.thoughtcrime.securesms.jobs.TrimThreadJob;
import org.thoughtcrime.securesms.jobs.TypingSendJob;
import org.thoughtcrime.securesms.jobs.UpdateApkJob;
import org.thoughtcrime.securesms.loki.api.BackgroundPollJob;
import org.thoughtcrime.securesms.loki.protocol.ClosedGroupUpdateMessageSendJob;
import org.thoughtcrime.securesms.loki.protocol.NullMessageSendJob;
@ -85,6 +86,7 @@ public class WorkManagerFactoryMappings {
put(RetrieveProfileAvatarJob.class.getName(), RetrieveProfileAvatarJob.KEY);
put(RetrieveProfileJob.class.getName(), RetrieveProfileJob.KEY);
put(RotateCertificateJob.class.getName(), RotateCertificateJob.KEY);
put(BackgroundPollJob.class.getName(), BackgroundPollJob.KEY);
put(RotateProfileKeyJob.class.getName(), RotateProfileKeyJob.KEY);
put(RotateSignedPreKeyJob.class.getName(), RotateSignedPreKeyJob.KEY);
put(SendDeliveryReceiptJob.class.getName(), SendDeliveryReceiptJob.KEY);

View File

@ -13,6 +13,7 @@ import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraintObserver;
import org.thoughtcrime.securesms.jobmanager.impl.NetworkOrCellServiceConstraint;
import org.thoughtcrime.securesms.jobmanager.impl.SqlCipherMigrationConstraint;
import org.thoughtcrime.securesms.jobmanager.impl.SqlCipherMigrationConstraintObserver;
import org.thoughtcrime.securesms.loki.api.BackgroundPollJob;
import org.thoughtcrime.securesms.loki.protocol.ClosedGroupUpdateMessageSendJob;
import org.thoughtcrime.securesms.loki.protocol.shelved.MultiDeviceOpenGroupUpdateJob;
import org.thoughtcrime.securesms.loki.protocol.NullMessageSendJob;
@ -61,6 +62,7 @@ public final class JobManagerFactories {
put(RetrieveProfileAvatarJob.KEY, new RetrieveProfileAvatarJob.Factory(application));
put(RetrieveProfileJob.KEY, new RetrieveProfileJob.Factory(application));
put(RotateCertificateJob.KEY, new RotateCertificateJob.Factory());
put(BackgroundPollJob.KEY, new BackgroundPollJob.Factory());
put(RotateProfileKeyJob.KEY, new RotateProfileKeyJob.Factory());
put(RotateSignedPreKeyJob.KEY, new RotateSignedPreKeyJob.Factory());
put(SendDeliveryReceiptJob.KEY, new SendDeliveryReceiptJob.Factory());

View File

@ -0,0 +1,80 @@
package org.thoughtcrime.securesms.loki.api
import android.content.Context
import nl.komponents.kovenant.Promise
import nl.komponents.kovenant.functional.map
import org.thoughtcrime.securesms.ApplicationContext
import org.thoughtcrime.securesms.database.DatabaseFactory
import org.thoughtcrime.securesms.dependencies.InjectableType
import org.thoughtcrime.securesms.jobmanager.Data
import org.thoughtcrime.securesms.jobmanager.Job
import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint
import org.thoughtcrime.securesms.jobs.BaseJob
import org.thoughtcrime.securesms.jobs.PushContentReceiveJob
import org.thoughtcrime.securesms.jobs.RotateCertificateJob
import org.thoughtcrime.securesms.logging.Log
import org.thoughtcrime.securesms.util.TextSecurePreferences
import org.whispersystems.signalservice.api.SignalServiceAccountManager
import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope
import org.whispersystems.signalservice.api.push.exceptions.PushNetworkException
import org.whispersystems.signalservice.loki.api.SnodeAPI
import java.io.IOException
import java.util.concurrent.TimeUnit
import javax.inject.Inject
class BackgroundPollJob private constructor(parameters: Parameters) : BaseJob(parameters) {
companion object {
const val KEY = "BackgroundPollJob"
}
constructor(context: Context) : this(Parameters.Builder()
.addConstraint(NetworkConstraint.KEY)
.setQueue(KEY)
.setLifespan(TimeUnit.DAYS.toMillis(1))
.setMaxAttempts(Parameters.UNLIMITED)
.build()) {
setContext(context)
}
override fun serialize(): Data {
return Data.EMPTY
}
override fun getFactoryKey(): String { return KEY }
public override fun onRun() {
try {
val userPublicKey = TextSecurePreferences.getLocalNumber(context)
val promises = mutableListOf<Promise<Unit, Exception>>()
val promise = SnodeAPI.shared.getMessages(userPublicKey).map { envelopes ->
envelopes.forEach {
PushContentReceiveJob(context).processEnvelope(SignalServiceEnvelope(it), false)
}
}
promises.add(promise)
promises.addAll(ClosedGroupPoller.shared.pollOnce())
val openGroups = DatabaseFactory.getLokiThreadDatabase(context).getAllPublicChats().map { it.value }
for (openGroup in openGroups) {
val poller = PublicChatPoller(context, openGroup)
poller.stop()
promises.add(poller.pollForNewMessages())
}
} catch (exception: Exception) {
Log.d("Loki", "Background poll failed due to error: $exception.")
}
}
public override fun onShouldRetry(e: Exception): Boolean {
return false
}
override fun onCanceled() { }
class Factory : Job.Factory<BackgroundPollJob> {
override fun create(parameters: Parameters, data: Data): BackgroundPollJob {
return BackgroundPollJob(parameters)
}
}
}

View File

@ -0,0 +1,36 @@
package org.thoughtcrime.securesms.loki.api
import android.content.Context
import android.content.Intent
import nl.komponents.kovenant.functional.map
import org.thoughtcrime.securesms.ApplicationContext
import org.thoughtcrime.securesms.database.DatabaseFactory
import org.thoughtcrime.securesms.jobs.PushContentReceiveJob
import org.thoughtcrime.securesms.service.PersistentAlarmManagerListener
import org.thoughtcrime.securesms.util.TextSecurePreferences
import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope
import org.whispersystems.signalservice.loki.api.SnodeAPI
import java.util.concurrent.TimeUnit
class BackgroundPollListener : PersistentAlarmManagerListener() {
companion object {
private val pollInterval = TimeUnit.MINUTES.toMillis(30)
@JvmStatic
fun schedule(context: Context) {
BackgroundPollListener().onReceive(context, Intent())
}
}
override fun getNextScheduledExecutionTime(context: Context): Long {
return TextSecurePreferences.getBackgroundPollTime(context)
}
override fun onAlarm(context: Context, scheduledTime: Long): Long {
ApplicationContext.getInstance(context).jobManager.add(BackgroundPollJob(context))
val nextTime = System.currentTimeMillis() + pollInterval
TextSecurePreferences.setBackgroundPollTime(context, nextTime)
return nextTime
}
}

View File

@ -1,59 +0,0 @@
package org.thoughtcrime.securesms.loki.api
import android.content.Context
import android.content.Intent
import nl.komponents.kovenant.functional.map
import org.thoughtcrime.securesms.ApplicationContext
import org.thoughtcrime.securesms.database.DatabaseFactory
import org.thoughtcrime.securesms.jobs.PushContentReceiveJob
import org.thoughtcrime.securesms.service.PersistentAlarmManagerListener
import org.thoughtcrime.securesms.util.TextSecurePreferences
import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope
import org.whispersystems.signalservice.loki.api.SnodeAPI
import java.util.concurrent.TimeUnit
class BackgroundPollWorker : PersistentAlarmManagerListener() {
companion object {
private val pollInterval = TimeUnit.MINUTES.toMillis(30)
@JvmStatic
fun schedule(context: Context) {
BackgroundPollWorker().onReceive(context, Intent())
}
}
override fun getNextScheduledExecutionTime(context: Context): Long {
return TextSecurePreferences.getBackgroundPollTime(context)
}
override fun onAlarm(context: Context, scheduledTime: Long): Long {
if (scheduledTime != 0L) {
if (!TextSecurePreferences.isUsingFCM(context)) {
val userPublicKey = TextSecurePreferences.getLocalNumber(context)
val lokiAPIDatabase = DatabaseFactory.getLokiAPIDatabase(context)
try {
val applicationContext = context.applicationContext as ApplicationContext
val broadcaster = applicationContext.broadcaster
SnodeAPI.configureIfNeeded(userPublicKey, lokiAPIDatabase, broadcaster)
SnodeAPI.shared.getMessages(userPublicKey).map { messages ->
messages.forEach {
PushContentReceiveJob(context).processEnvelope(SignalServiceEnvelope(it), false)
}
}
} catch (exception: Throwable) {
// Do nothing
}
}
val openGroups = DatabaseFactory.getLokiThreadDatabase(context).getAllPublicChats().map { it.value }
for (openGroup in openGroups) {
val poller = PublicChatPoller(context, openGroup)
poller.stop()
poller.pollForNewMessages()
}
}
val nextTime = System.currentTimeMillis() + pollInterval
TextSecurePreferences.setBackgroundPollTime(context, nextTime)
return nextTime
}
}

View File

@ -2,6 +2,7 @@ package org.thoughtcrime.securesms.loki.api
import android.content.Context
import android.os.Handler
import nl.komponents.kovenant.Promise
import nl.komponents.kovenant.functional.bind
import nl.komponents.kovenant.functional.map
import org.thoughtcrime.securesms.jobs.PushContentReceiveJob
@ -9,6 +10,7 @@ import org.thoughtcrime.securesms.logging.Log
import org.thoughtcrime.securesms.loki.database.SharedSenderKeysDatabase
import org.thoughtcrime.securesms.loki.utilities.successBackground
import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope
import org.whispersystems.signalservice.internal.push.SignalServiceProtos
import org.whispersystems.signalservice.loki.api.SnodeAPI
import org.whispersystems.signalservice.loki.api.SwarmAPI
import org.whispersystems.signalservice.loki.utilities.getRandomElementOrNull
@ -50,6 +52,12 @@ class ClosedGroupPoller private constructor(private val context: Context, privat
task.run()
}
public fun pollOnce(): List<Promise<Unit, Exception>> {
if (isPolling) { return listOf() }
isPolling = true
return poll()
}
public fun stopIfNeeded() {
isPolling = false
handler.removeCallbacks(task)
@ -57,24 +65,27 @@ class ClosedGroupPoller private constructor(private val context: Context, privat
// endregion
// region Private API
private fun poll() {
if (!isPolling) { return }
private fun poll(): List<Promise<Unit, Exception>> {
if (!isPolling) { return listOf() }
val publicKeys = database.getAllClosedGroupPublicKeys()
publicKeys.forEach { publicKey ->
SwarmAPI.shared.getSwarm(publicKey).bind { swarm ->
return publicKeys.map { publicKey ->
val promise = SwarmAPI.shared.getSwarm(publicKey).bind { swarm ->
val snode = swarm.getRandomElementOrNull() ?: throw InsufficientSnodesException() // Should be cryptographically secure
if (!isPolling) { throw PollingCanceledException() }
SnodeAPI.shared.getRawMessages(snode, publicKey).map {SnodeAPI.shared.parseRawMessagesResponse(it, snode, publicKey) }
}.successBackground { messages ->
}
promise.successBackground { messages ->
if (messages.isNotEmpty()) {
Log.d("Loki", "Received ${messages.count()} new message(s) in closed group with public key: $publicKey.")
}
messages.forEach {
PushContentReceiveJob(context).processEnvelope(SignalServiceEnvelope(it), false)
}
}.fail {
}
promise.fail {
Log.d("Loki", "Polling failed for closed group with public key: $publicKey due to error: $it.")
}
promise.map { Unit }
}
}
// endregion

View File

@ -5,6 +5,7 @@ import android.os.Handler
import android.util.Log
import nl.komponents.kovenant.Promise
import nl.komponents.kovenant.functional.bind
import nl.komponents.kovenant.functional.map
import org.thoughtcrime.securesms.ApplicationContext
import org.thoughtcrime.securesms.crypto.IdentityKeyUtil
import org.thoughtcrime.securesms.database.Address
@ -156,7 +157,7 @@ class PublicChatPoller(private val context: Context, private val group: PublicCh
return SignalServiceDataMessage(message.timestamp, serviceGroup, attachments, body, false, 0, false, null, false, quote, null, signalLinkPreviews, null)
}
fun pollForNewMessages() {
fun pollForNewMessages(): Promise<Unit, Exception> {
fun processIncomingMessage(message: PublicChatMessage) {
// If the sender of the current message is not a slave device, set the display name in the database
val masterHexEncodedPublicKey = MultiDeviceProtocol.shared.getMasterDevice(message.senderPublicKey)
@ -217,7 +218,7 @@ class PublicChatPoller(private val context: Context, private val group: PublicCh
}
}
}
if (isPollOngoing) { return }
if (isPollOngoing) { return Promise.of(Unit) }
isPollOngoing = true
val userDevices = MultiDeviceProtocol.shared.getAllLinkedDevices(userHexEncodedPublicKey)
var uniqueDevices = setOf<String>()
@ -225,7 +226,7 @@ class PublicChatPoller(private val context: Context, private val group: PublicCh
val apiDB = DatabaseFactory.getLokiAPIDatabase(context)
FileServerAPI.configure(userHexEncodedPublicKey, userPrivateKey, apiDB)
// Kovenant propagates a context to chained promises, so LokiPublicChatAPI.sharedContext should be used for all of the below
api.getMessages(group.channel, group.server).bind(PublicChatAPI.sharedContext) { messages ->
val promise = api.getMessages(group.channel, group.server).bind(PublicChatAPI.sharedContext) { messages ->
/*
if (messages.isNotEmpty()) {
// We need to fetch the device mapping for any devices we don't have
@ -237,7 +238,8 @@ class PublicChatPoller(private val context: Context, private val group: PublicCh
}
*/
Promise.of(messages)
}.successBackground {
}
promise.successBackground {
/*
val newDisplayNameUpdatees = uniqueDevices.mapNotNull {
// This will return null if the current device is a master device
@ -246,7 +248,8 @@ class PublicChatPoller(private val context: Context, private val group: PublicCh
// Fetch the display names of the master devices
displayNameUpdatees = displayNameUpdatees.union(newDisplayNameUpdatees)
*/
}.successBackground { messages ->
}
promise.successBackground { messages ->
// Process messages in the background
messages.forEach { message ->
if (userDevices.contains(message.senderPublicKey)) {
@ -257,10 +260,12 @@ class PublicChatPoller(private val context: Context, private val group: PublicCh
}
isCaughtUp = true
isPollOngoing = false
}.fail {
}
promise.fail {
Log.d("Loki", "Failed to get messages for group chat with ID: ${group.channel} on server: ${group.server}.")
isPollOngoing = false
}
return promise.map { Unit }
}
private fun pollForDisplayNames() {

View File

@ -26,7 +26,7 @@ class PushNotificationService : FirebaseMessagingService() {
val envelope = MessageWrapper.unwrap(data)
PushContentReceiveJob(this).processEnvelope(SignalServiceEnvelope(envelope), true)
} catch (e: Exception) {
Log.d("Loki", "Failed to unwrap data for message.")
Log.d("Loki", "Failed to unwrap data for message due to error: $e.")
}
} else {
Log.d("Loki", "Failed to decode data for message.")

View File

@ -13,6 +13,7 @@ public abstract class PersistentAlarmManagerListener extends BroadcastReceiver {
private static final String TAG = PersistentAlarmManagerListener.class.getSimpleName();
protected abstract long getNextScheduledExecutionTime(Context context);
protected abstract long onAlarm(Context context, long scheduledTime);
@Override