diff --git a/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java b/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java index e5c270b4a2..a843d68134 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java +++ b/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java @@ -36,7 +36,7 @@ import org.session.libsession.messaging.file_server.FileServerAPI; import org.session.libsession.messaging.mentions.MentionsManager; import org.session.libsession.messaging.open_groups.OpenGroupAPI; import org.session.libsession.messaging.sending_receiving.notifications.MessageNotifier; -import org.session.libsession.messaging.sending_receiving.pollers.ClosedGroupPoller; +import org.session.libsession.messaging.sending_receiving.pollers.ClosedGroupPollerV2; import org.session.libsession.messaging.sending_receiving.pollers.Poller; import org.session.libsession.snode.SnodeModule; import org.session.libsession.utilities.Address; @@ -131,7 +131,6 @@ public class ApplicationContext extends MultiDexApplication implements Dependenc // Loki public MessageNotifier messageNotifier = null; public Poller poller = null; - public ClosedGroupPoller closedGroupPoller = null; public Broadcaster broadcaster = null; public SignalCommunicationModule communicationModule; private Job firebaseInstanceIdJob; @@ -220,9 +219,7 @@ public class ApplicationContext extends MultiDexApplication implements Dependenc if (poller != null) { poller.stopIfNeeded(); } - if (closedGroupPoller != null) { - closedGroupPoller.stopIfNeeded(); - } + ClosedGroupPollerV2.getShared().stop(); } @Override @@ -448,7 +445,6 @@ public class ApplicationContext extends MultiDexApplication implements Dependenc return; } poller = new Poller(); - closedGroupPoller = new ClosedGroupPoller(); } public void startPollingIfNeeded() { @@ -456,9 +452,7 @@ public class ApplicationContext extends MultiDexApplication implements Dependenc if (poller != null) { poller.startIfNeeded(); } - if (closedGroupPoller != null) { - closedGroupPoller.startIfNeeded(); - } + ClosedGroupPollerV2.getShared().start(); } private void resubmitProfilePictureIfNeeded() { diff --git a/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt b/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt index 167bd522a0..a60807b2e9 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt @@ -603,6 +603,11 @@ class Storage(context: Context, helper: SQLCipherOpenHelper) : Database(context, return if (threadID < 0) null else threadID } + fun foo() { + val threadDB = DatabaseFactory.getThreadDatabase(context) + + } + override fun getThreadIdForMms(mmsId: Long): Long { val mmsDb = DatabaseFactory.getMmsDatabase(context) val cursor = mmsDb.getMessage(mmsId) @@ -670,10 +675,15 @@ class Storage(context: Context, helper: SQLCipherOpenHelper) : Database(context, threadDatabase.getOrCreateThreadIdFor(recipient) } if (contacts.isNotEmpty()) { - threadDatabase.notifyUpdatedFromConfig() + threadDatabase.notifyConversationListListeners() } } + override fun getLastUpdated(threadID: Long): Long { + val threadDB = DatabaseFactory.getThreadDatabase(context) + return threadDB.getLastUpdated(threadID) + } + override fun getAttachmentDataUri(attachmentId: AttachmentId): Uri { return PartAuthority.getAttachmentDataUri(attachmentId) } diff --git a/app/src/main/java/org/thoughtcrime/securesms/database/ThreadDatabase.java b/app/src/main/java/org/thoughtcrime/securesms/database/ThreadDatabase.java index 98a6cf667e..ebb0dbab26 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/database/ThreadDatabase.java +++ b/app/src/main/java/org/thoughtcrime/securesms/database/ThreadDatabase.java @@ -64,7 +64,7 @@ public class ThreadDatabase extends Database { private static final String TAG = ThreadDatabase.class.getSimpleName(); - private Map addressCache = new HashMap<>(); + private final Map addressCache = new HashMap<>(); public static final String TABLE_NAME = "thread"; public static final String ID = "_id"; @@ -404,6 +404,21 @@ public class ThreadDatabase extends Database { } } + public Long getLastUpdated(long threadId) { + SQLiteDatabase db = databaseHelper.getReadableDatabase(); + Cursor cursor = db.query(TABLE_NAME, new String[]{DATE}, ID_WHERE, new String[]{String.valueOf(threadId)}, null, null, null); + + try { + if (cursor != null && cursor.moveToFirst()) { + return cursor.getLong(0); + } + + return -1L; + } finally { + if (cursor != null) cursor.close(); + } + } + public void deleteConversation(long threadId) { DatabaseFactory.getSmsDatabase(context).deleteThread(threadId); DatabaseFactory.getMmsDatabase(context).deleteThread(threadId); @@ -471,7 +486,6 @@ public class ThreadDatabase extends Database { } public @Nullable Recipient getRecipientForThreadId(long threadId) { - // Loki - Cache the address if (addressCache.containsKey(threadId) && addressCache.get(threadId) != null) { return Recipient.from(context, addressCache.get(threadId), false); } @@ -505,10 +519,6 @@ public class ThreadDatabase extends Database { notifyConversationListeners(threadId); } - public void notifyUpdatedFromConfig() { - notifyConversationListListeners(); - } - public boolean update(long threadId, boolean unarchive) { MmsSmsDatabase mmsSmsDatabase = DatabaseFactory.getMmsSmsDatabase(context); long count = mmsSmsDatabase.getConversationCount(threadId); diff --git a/app/src/main/java/org/thoughtcrime/securesms/loki/api/BackgroundPollWorker.kt b/app/src/main/java/org/thoughtcrime/securesms/loki/api/BackgroundPollWorker.kt index fc3e7c1bba..d070c25f2f 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/loki/api/BackgroundPollWorker.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/loki/api/BackgroundPollWorker.kt @@ -7,9 +7,9 @@ import androidx.work.* import nl.komponents.kovenant.Promise import nl.komponents.kovenant.all import nl.komponents.kovenant.functional.map +import org.session.libsession.messaging.MessagingModuleConfiguration import org.session.libsession.messaging.jobs.MessageReceiveJob -import org.session.libsession.messaging.open_groups.OpenGroupV2 -import org.session.libsession.messaging.sending_receiving.pollers.ClosedGroupPoller +import org.session.libsession.messaging.sending_receiving.pollers.ClosedGroupPollerV2 import org.session.libsession.messaging.sending_receiving.pollers.OpenGroupPollerV2 import org.session.libsession.snode.SnodeAPI import org.session.libsession.utilities.TextSecurePreferences @@ -57,7 +57,10 @@ class BackgroundPollWorker(val context: Context, params: WorkerParameters) : Wor promises.addAll(dmsPromise.get()) // Closed groups - promises.addAll(ClosedGroupPoller().pollOnce()) + val closedGroupPoller = ClosedGroupPollerV2() // Intentionally don't use shared + val storage = MessagingModuleConfiguration.shared.storage + val allGroupPublicKeys = storage.getAllClosedGroupPublicKeys() + allGroupPublicKeys.forEach { closedGroupPoller.poll(it) } // Open Groups val threadDB = DatabaseFactory.getLokiThreadDatabase(context) diff --git a/libsession/src/main/java/org/session/libsession/database/StorageProtocol.kt b/libsession/src/main/java/org/session/libsession/database/StorageProtocol.kt index 9b1956890f..cc30b8c7cb 100644 --- a/libsession/src/main/java/org/session/libsession/database/StorageProtocol.kt +++ b/libsession/src/main/java/org/session/libsession/database/StorageProtocol.kt @@ -142,6 +142,7 @@ interface StorageProtocol { fun getOrCreateThreadIdFor(publicKey: String, groupPublicKey: String?, openGroupID: String?): Long fun getThreadIdFor(address: Address): Long? fun getThreadIdForMms(mmsId: Long): Long + fun getLastUpdated(threadID: Long): Long // Session Request fun getSessionRequestSentTimestamp(publicKey: String): Long? diff --git a/libsession/src/main/java/org/session/libsession/messaging/MessagingModuleConfiguration.kt b/libsession/src/main/java/org/session/libsession/messaging/MessagingModuleConfiguration.kt index 20b8d49d14..03349369dc 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/MessagingModuleConfiguration.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/MessagingModuleConfiguration.kt @@ -5,9 +5,9 @@ import org.session.libsession.database.MessageDataProvider import org.session.libsession.database.StorageProtocol class MessagingModuleConfiguration( - val context: Context, - val storage: StorageProtocol, - val messageDataProvider: MessageDataProvider + val context: Context, + val storage: StorageProtocol, + val messageDataProvider: MessageDataProvider ) { companion object { diff --git a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/MessageSenderClosedGroupHandler.kt b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/MessageSenderClosedGroupHandler.kt index aceeda6635..049514efb1 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/MessageSenderClosedGroupHandler.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/MessageSenderClosedGroupHandler.kt @@ -9,6 +9,7 @@ import org.session.libsession.messaging.MessagingModuleConfiguration import org.session.libsession.messaging.messages.control.ClosedGroupControlMessage import org.session.libsession.messaging.sending_receiving.MessageSender.Error import org.session.libsession.messaging.sending_receiving.notifications.PushNotificationAPI +import org.session.libsession.messaging.sending_receiving.pollers.ClosedGroupPollerV2 import org.session.libsession.utilities.Address import org.session.libsession.utilities.GroupUtil import org.session.libsession.utilities.TextSecurePreferences @@ -71,6 +72,8 @@ fun MessageSender.create(name: String, members: Collection): Promise> { - if (isPolling) { return listOf() } - isPolling = true - return poll() - } - - public fun stopIfNeeded() { - isPolling = false - handler.removeCallbacks(task) - } - // endregion - - // region Private API - private fun poll(): List> { - if (!isPolling) { return listOf() } - val storage = MessagingModuleConfiguration.shared.storage - val publicKeys = storage.getAllActiveClosedGroupPublicKeys() - return publicKeys.map { publicKey -> - val promise = SnodeAPI.getSwarm(publicKey).bind { swarm -> - val snode = swarm.getRandomElementOrNull() ?: throw InsufficientSnodesException() // Should be cryptographically secure - if (!isPolling) { throw PollingCanceledException() } - SnodeAPI.getRawMessages(snode, publicKey).map {SnodeAPI.parseRawMessagesResponse(it, snode, publicKey) } - } - promise.successBackground { messages -> - if (!storage.isGroupActive(publicKey)) { return@successBackground } - messages.forEach { envelope -> - val job = MessageReceiveJob(envelope.toByteArray()) - JobQueue.shared.add(job) - } - } - promise.fail { - Log.d("Loki", "Polling failed for closed group with public key: $publicKey due to error: $it.") - } - promise.map { } - } - } - // endregion -} diff --git a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/ClosedGroupPollerV2.kt b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/ClosedGroupPollerV2.kt new file mode 100644 index 0000000000..c12021447d --- /dev/null +++ b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/ClosedGroupPollerV2.kt @@ -0,0 +1,115 @@ +package org.session.libsession.messaging.sending_receiving.pollers + +import nl.komponents.kovenant.Promise +import nl.komponents.kovenant.functional.bind +import nl.komponents.kovenant.functional.map +import org.session.libsession.messaging.MessagingModuleConfiguration +import org.session.libsession.messaging.jobs.JobQueue +import org.session.libsession.messaging.jobs.MessageReceiveJob +import org.session.libsession.snode.SnodeAPI +import org.session.libsession.utilities.GroupUtil +import org.session.libsignal.crypto.getRandomElementOrNull +import org.session.libsignal.utilities.Log +import org.session.libsignal.utilities.successBackground +import java.util.* +import java.util.concurrent.Executors +import java.util.concurrent.ScheduledFuture +import java.util.concurrent.TimeUnit +import kotlin.math.min + +class ClosedGroupPollerV2 { + private val executorService = Executors.newScheduledThreadPool(4) + private var isPolling = mutableMapOf() + private var futures = mutableMapOf>() + + private fun isPolling(groupPublicKey: String): Boolean { + return isPolling[groupPublicKey] ?: false + } + + companion object { + private val minPollInterval = 4 * 1000 + private val maxPollInterval = 2 * 60 * 1000 + + @JvmStatic + val shared = ClosedGroupPollerV2() + } + + class InsufficientSnodesException() : Exception("No snodes left to poll.") + class PollingCanceledException() : Exception("Polling canceled.") + + fun start() { + val storage = MessagingModuleConfiguration.shared.storage + val allGroupPublicKeys = storage.getAllClosedGroupPublicKeys() + allGroupPublicKeys.forEach { startPolling(it) } + } + + fun startPolling(groupPublicKey: String) { + if (isPolling(groupPublicKey)) { return } + setUpPolling(groupPublicKey) + isPolling[groupPublicKey] = true + } + + fun stop() { + val storage = MessagingModuleConfiguration.shared.storage + val allGroupPublicKeys = storage.getAllClosedGroupPublicKeys() + allGroupPublicKeys.forEach { stopPolling(it) } + } + + fun stopPolling(groupPublicKey: String) { + futures[groupPublicKey]?.cancel(false) + isPolling[groupPublicKey] = false + } + + private fun setUpPolling(groupPublicKey: String) { + poll(groupPublicKey).success { + pollRecursively(groupPublicKey) + }.fail { + // The error is logged in poll(_:) + pollRecursively(groupPublicKey) + } + } + + private fun pollRecursively(groupPublicKey: String) { + if (!isPolling(groupPublicKey)) { return } + // Get the received date of the last message in the thread. If we don't have any messages yet, pick some + // reasonable fake time interval to use instead. + val storage = MessagingModuleConfiguration.shared.storage + val groupID = GroupUtil.doubleEncodeGroupID(groupPublicKey) + val threadID = storage.getThreadID(groupID)?.toLongOrNull() ?: return + val lastUpdated = storage.getLastUpdated(threadID) + val timeSinceLastMessage = if (lastUpdated != -1L) Date().time - lastUpdated else 5 * 60 * 1000 + val minPollInterval = Companion.minPollInterval + val limit: Long = 12 * 60 * 60 * 1000 + val a = (Companion.maxPollInterval - minPollInterval).toDouble() / limit.toDouble() + val nextPollInterval = a * min(timeSinceLastMessage, limit) + minPollInterval + Log.d("Loki", "Next poll interval for closed group with public key: $groupPublicKey is ${nextPollInterval / 1000} s.") + executorService?.schedule({ + poll(groupPublicKey).success { + pollRecursively(groupPublicKey) + }.fail { + // The error is logged in poll(_:) + pollRecursively(groupPublicKey) + } + }, nextPollInterval.toLong(), TimeUnit.MILLISECONDS) + } + + fun poll(groupPublicKey: String): Promise { + if (!isPolling(groupPublicKey)) { return Promise.of(Unit) } + val promise = SnodeAPI.getSwarm(groupPublicKey).bind { swarm -> + val snode = swarm.getRandomElementOrNull() ?: throw InsufficientSnodesException() // Should be cryptographically secure + if (!isPolling(groupPublicKey)) { throw PollingCanceledException() } + SnodeAPI.getRawMessages(snode, groupPublicKey).map { SnodeAPI.parseRawMessagesResponse(it, snode, groupPublicKey) } + } + promise.success { envelopes -> + if (!isPolling(groupPublicKey)) { return@success } + envelopes.forEach { envelope -> + val job = MessageReceiveJob(envelope.toByteArray()) + JobQueue.shared.add(job) + } + } + promise.fail { + Log.d("Loki", "Polling failed for closed group with public key: $groupPublicKey due to error: $it.") + } + return promise.map { } + } +}