Merge branch 'dev' of https://github.com/oxen-io/session-android into fix-open-group-spam

This commit is contained in:
Ryan ZHAO 2021-05-21 14:55:15 +10:00
commit 953f6d8c17
10 changed files with 163 additions and 104 deletions

View File

@ -36,7 +36,7 @@ import org.session.libsession.messaging.file_server.FileServerAPI;
import org.session.libsession.messaging.mentions.MentionsManager; import org.session.libsession.messaging.mentions.MentionsManager;
import org.session.libsession.messaging.open_groups.OpenGroupAPI; import org.session.libsession.messaging.open_groups.OpenGroupAPI;
import org.session.libsession.messaging.sending_receiving.notifications.MessageNotifier; import org.session.libsession.messaging.sending_receiving.notifications.MessageNotifier;
import org.session.libsession.messaging.sending_receiving.pollers.ClosedGroupPoller; import org.session.libsession.messaging.sending_receiving.pollers.ClosedGroupPollerV2;
import org.session.libsession.messaging.sending_receiving.pollers.Poller; import org.session.libsession.messaging.sending_receiving.pollers.Poller;
import org.session.libsession.snode.SnodeModule; import org.session.libsession.snode.SnodeModule;
import org.session.libsession.utilities.Address; import org.session.libsession.utilities.Address;
@ -131,7 +131,6 @@ public class ApplicationContext extends MultiDexApplication implements Dependenc
// Loki // Loki
public MessageNotifier messageNotifier = null; public MessageNotifier messageNotifier = null;
public Poller poller = null; public Poller poller = null;
public ClosedGroupPoller closedGroupPoller = null;
public Broadcaster broadcaster = null; public Broadcaster broadcaster = null;
public SignalCommunicationModule communicationModule; public SignalCommunicationModule communicationModule;
private Job firebaseInstanceIdJob; private Job firebaseInstanceIdJob;
@ -220,9 +219,7 @@ public class ApplicationContext extends MultiDexApplication implements Dependenc
if (poller != null) { if (poller != null) {
poller.stopIfNeeded(); poller.stopIfNeeded();
} }
if (closedGroupPoller != null) { ClosedGroupPollerV2.getShared().stop();
closedGroupPoller.stopIfNeeded();
}
} }
@Override @Override
@ -448,7 +445,6 @@ public class ApplicationContext extends MultiDexApplication implements Dependenc
return; return;
} }
poller = new Poller(); poller = new Poller();
closedGroupPoller = new ClosedGroupPoller();
} }
public void startPollingIfNeeded() { public void startPollingIfNeeded() {
@ -456,9 +452,7 @@ public class ApplicationContext extends MultiDexApplication implements Dependenc
if (poller != null) { if (poller != null) {
poller.startIfNeeded(); poller.startIfNeeded();
} }
if (closedGroupPoller != null) { ClosedGroupPollerV2.getShared().start();
closedGroupPoller.startIfNeeded();
}
} }
private void resubmitProfilePictureIfNeeded() { private void resubmitProfilePictureIfNeeded() {

View File

@ -603,6 +603,11 @@ class Storage(context: Context, helper: SQLCipherOpenHelper) : Database(context,
return if (threadID < 0) null else threadID return if (threadID < 0) null else threadID
} }
fun foo() {
val threadDB = DatabaseFactory.getThreadDatabase(context)
}
override fun getThreadIdForMms(mmsId: Long): Long { override fun getThreadIdForMms(mmsId: Long): Long {
val mmsDb = DatabaseFactory.getMmsDatabase(context) val mmsDb = DatabaseFactory.getMmsDatabase(context)
val cursor = mmsDb.getMessage(mmsId) val cursor = mmsDb.getMessage(mmsId)
@ -670,10 +675,15 @@ class Storage(context: Context, helper: SQLCipherOpenHelper) : Database(context,
threadDatabase.getOrCreateThreadIdFor(recipient) threadDatabase.getOrCreateThreadIdFor(recipient)
} }
if (contacts.isNotEmpty()) { if (contacts.isNotEmpty()) {
threadDatabase.notifyUpdatedFromConfig() threadDatabase.notifyConversationListListeners()
} }
} }
override fun getLastUpdated(threadID: Long): Long {
val threadDB = DatabaseFactory.getThreadDatabase(context)
return threadDB.getLastUpdated(threadID)
}
override fun getAttachmentDataUri(attachmentId: AttachmentId): Uri { override fun getAttachmentDataUri(attachmentId: AttachmentId): Uri {
return PartAuthority.getAttachmentDataUri(attachmentId) return PartAuthority.getAttachmentDataUri(attachmentId)
} }

View File

@ -64,7 +64,7 @@ public class ThreadDatabase extends Database {
private static final String TAG = ThreadDatabase.class.getSimpleName(); private static final String TAG = ThreadDatabase.class.getSimpleName();
private Map<Long, Address> addressCache = new HashMap<>(); private final Map<Long, Address> addressCache = new HashMap<>();
public static final String TABLE_NAME = "thread"; public static final String TABLE_NAME = "thread";
public static final String ID = "_id"; public static final String ID = "_id";
@ -404,6 +404,21 @@ public class ThreadDatabase extends Database {
} }
} }
public Long getLastUpdated(long threadId) {
SQLiteDatabase db = databaseHelper.getReadableDatabase();
Cursor cursor = db.query(TABLE_NAME, new String[]{DATE}, ID_WHERE, new String[]{String.valueOf(threadId)}, null, null, null);
try {
if (cursor != null && cursor.moveToFirst()) {
return cursor.getLong(0);
}
return -1L;
} finally {
if (cursor != null) cursor.close();
}
}
public void deleteConversation(long threadId) { public void deleteConversation(long threadId) {
DatabaseFactory.getSmsDatabase(context).deleteThread(threadId); DatabaseFactory.getSmsDatabase(context).deleteThread(threadId);
DatabaseFactory.getMmsDatabase(context).deleteThread(threadId); DatabaseFactory.getMmsDatabase(context).deleteThread(threadId);
@ -471,7 +486,6 @@ public class ThreadDatabase extends Database {
} }
public @Nullable Recipient getRecipientForThreadId(long threadId) { public @Nullable Recipient getRecipientForThreadId(long threadId) {
// Loki - Cache the address
if (addressCache.containsKey(threadId) && addressCache.get(threadId) != null) { if (addressCache.containsKey(threadId) && addressCache.get(threadId) != null) {
return Recipient.from(context, addressCache.get(threadId), false); return Recipient.from(context, addressCache.get(threadId), false);
} }
@ -505,10 +519,6 @@ public class ThreadDatabase extends Database {
notifyConversationListeners(threadId); notifyConversationListeners(threadId);
} }
public void notifyUpdatedFromConfig() {
notifyConversationListListeners();
}
public boolean update(long threadId, boolean unarchive) { public boolean update(long threadId, boolean unarchive) {
MmsSmsDatabase mmsSmsDatabase = DatabaseFactory.getMmsSmsDatabase(context); MmsSmsDatabase mmsSmsDatabase = DatabaseFactory.getMmsSmsDatabase(context);
long count = mmsSmsDatabase.getConversationCount(threadId); long count = mmsSmsDatabase.getConversationCount(threadId);

View File

@ -7,9 +7,9 @@ import androidx.work.*
import nl.komponents.kovenant.Promise import nl.komponents.kovenant.Promise
import nl.komponents.kovenant.all import nl.komponents.kovenant.all
import nl.komponents.kovenant.functional.map import nl.komponents.kovenant.functional.map
import org.session.libsession.messaging.MessagingModuleConfiguration
import org.session.libsession.messaging.jobs.MessageReceiveJob import org.session.libsession.messaging.jobs.MessageReceiveJob
import org.session.libsession.messaging.open_groups.OpenGroupV2 import org.session.libsession.messaging.sending_receiving.pollers.ClosedGroupPollerV2
import org.session.libsession.messaging.sending_receiving.pollers.ClosedGroupPoller
import org.session.libsession.messaging.sending_receiving.pollers.OpenGroupPollerV2 import org.session.libsession.messaging.sending_receiving.pollers.OpenGroupPollerV2
import org.session.libsession.snode.SnodeAPI import org.session.libsession.snode.SnodeAPI
import org.session.libsession.utilities.TextSecurePreferences import org.session.libsession.utilities.TextSecurePreferences
@ -57,7 +57,10 @@ class BackgroundPollWorker(val context: Context, params: WorkerParameters) : Wor
promises.addAll(dmsPromise.get()) promises.addAll(dmsPromise.get())
// Closed groups // Closed groups
promises.addAll(ClosedGroupPoller().pollOnce()) val closedGroupPoller = ClosedGroupPollerV2() // Intentionally don't use shared
val storage = MessagingModuleConfiguration.shared.storage
val allGroupPublicKeys = storage.getAllClosedGroupPublicKeys()
allGroupPublicKeys.forEach { closedGroupPoller.poll(it) }
// Open Groups // Open Groups
val threadDB = DatabaseFactory.getLokiThreadDatabase(context) val threadDB = DatabaseFactory.getLokiThreadDatabase(context)

View File

@ -142,6 +142,7 @@ interface StorageProtocol {
fun getOrCreateThreadIdFor(publicKey: String, groupPublicKey: String?, openGroupID: String?): Long fun getOrCreateThreadIdFor(publicKey: String, groupPublicKey: String?, openGroupID: String?): Long
fun getThreadIdFor(address: Address): Long? fun getThreadIdFor(address: Address): Long?
fun getThreadIdForMms(mmsId: Long): Long fun getThreadIdForMms(mmsId: Long): Long
fun getLastUpdated(threadID: Long): Long
// Session Request // Session Request
fun getSessionRequestSentTimestamp(publicKey: String): Long? fun getSessionRequestSentTimestamp(publicKey: String): Long?

View File

@ -5,9 +5,9 @@ import org.session.libsession.database.MessageDataProvider
import org.session.libsession.database.StorageProtocol import org.session.libsession.database.StorageProtocol
class MessagingModuleConfiguration( class MessagingModuleConfiguration(
val context: Context, val context: Context,
val storage: StorageProtocol, val storage: StorageProtocol,
val messageDataProvider: MessageDataProvider val messageDataProvider: MessageDataProvider
) { ) {
companion object { companion object {

View File

@ -9,6 +9,7 @@ import org.session.libsession.messaging.MessagingModuleConfiguration
import org.session.libsession.messaging.messages.control.ClosedGroupControlMessage import org.session.libsession.messaging.messages.control.ClosedGroupControlMessage
import org.session.libsession.messaging.sending_receiving.MessageSender.Error import org.session.libsession.messaging.sending_receiving.MessageSender.Error
import org.session.libsession.messaging.sending_receiving.notifications.PushNotificationAPI import org.session.libsession.messaging.sending_receiving.notifications.PushNotificationAPI
import org.session.libsession.messaging.sending_receiving.pollers.ClosedGroupPollerV2
import org.session.libsession.utilities.Address import org.session.libsession.utilities.Address
import org.session.libsession.utilities.GroupUtil import org.session.libsession.utilities.GroupUtil
import org.session.libsession.utilities.TextSecurePreferences import org.session.libsession.utilities.TextSecurePreferences
@ -71,6 +72,8 @@ fun MessageSender.create(name: String, members: Collection<String>): Promise<Str
storage.insertOutgoingInfoMessage(context, groupID, SignalServiceGroup.Type.CREATION, name, members, admins, threadID, sentTime) storage.insertOutgoingInfoMessage(context, groupID, SignalServiceGroup.Type.CREATION, name, members, admins, threadID, sentTime)
// Notify the PN server // Notify the PN server
PushNotificationAPI.performOperation(PushNotificationAPI.ClosedGroupOperation.Subscribe, groupPublicKey, userPublicKey) PushNotificationAPI.performOperation(PushNotificationAPI.ClosedGroupOperation.Subscribe, groupPublicKey, userPublicKey)
// Start polling
ClosedGroupPollerV2.shared.startPolling(groupPublicKey)
// Fulfill the promise // Fulfill the promise
deferred.resolve(groupID) deferred.resolve(groupID)
} }

View File

@ -12,6 +12,7 @@ import org.session.libsession.messaging.sending_receiving.attachments.PointerAtt
import org.session.libsession.messaging.sending_receiving.data_extraction.DataExtractionNotificationInfoMessage import org.session.libsession.messaging.sending_receiving.data_extraction.DataExtractionNotificationInfoMessage
import org.session.libsession.messaging.sending_receiving.link_preview.LinkPreview import org.session.libsession.messaging.sending_receiving.link_preview.LinkPreview
import org.session.libsession.messaging.sending_receiving.notifications.PushNotificationAPI import org.session.libsession.messaging.sending_receiving.notifications.PushNotificationAPI
import org.session.libsession.messaging.sending_receiving.pollers.ClosedGroupPollerV2
import org.session.libsession.messaging.sending_receiving.quotes.QuoteModel import org.session.libsession.messaging.sending_receiving.quotes.QuoteModel
import org.session.libsession.utilities.Address import org.session.libsession.utilities.Address
import org.session.libsession.utilities.GroupRecord import org.session.libsession.utilities.GroupRecord
@ -286,6 +287,8 @@ private fun handleNewClosedGroup(sender: String, sentTimestamp: Long, groupPubli
storage.addClosedGroupEncryptionKeyPair(encryptionKeyPair, groupPublicKey) storage.addClosedGroupEncryptionKeyPair(encryptionKeyPair, groupPublicKey)
// Notify the PN server // Notify the PN server
PushNotificationAPI.performOperation(PushNotificationAPI.ClosedGroupOperation.Subscribe, groupPublicKey, storage.getUserPublicKey()!!) PushNotificationAPI.performOperation(PushNotificationAPI.ClosedGroupOperation.Subscribe, groupPublicKey, storage.getUserPublicKey()!!)
// Start polling
ClosedGroupPollerV2.shared.startPolling(groupPublicKey)
} }
private fun MessageReceiver.handleClosedGroupEncryptionKeyPair(message: ClosedGroupControlMessage) { private fun MessageReceiver.handleClosedGroupEncryptionKeyPair(message: ClosedGroupControlMessage) {
@ -562,5 +565,7 @@ fun MessageReceiver.disableLocalGroupAndUnsubscribe(groupPublicKey: String, grou
storage.removeMember(groupID, Address.fromSerialized(userPublicKey)) storage.removeMember(groupID, Address.fromSerialized(userPublicKey))
// Notify the PN server // Notify the PN server
PushNotificationAPI.performOperation(PushNotificationAPI.ClosedGroupOperation.Unsubscribe, groupPublicKey, userPublicKey) PushNotificationAPI.performOperation(PushNotificationAPI.ClosedGroupOperation.Unsubscribe, groupPublicKey, userPublicKey)
// Stop polling
ClosedGroupPollerV2.shared.stopPolling(groupPublicKey)
} }
// endregion // endregion

View File

@ -1,82 +0,0 @@
package org.session.libsession.messaging.sending_receiving.pollers
import android.os.Handler
import nl.komponents.kovenant.Promise
import nl.komponents.kovenant.functional.bind
import nl.komponents.kovenant.functional.map
import org.session.libsession.messaging.MessagingModuleConfiguration
import org.session.libsession.messaging.jobs.JobQueue
import org.session.libsession.messaging.jobs.MessageReceiveJob
import org.session.libsession.snode.SnodeAPI
import org.session.libsignal.crypto.getRandomElementOrNull
import org.session.libsignal.utilities.Log
import org.session.libsignal.utilities.successBackground
class ClosedGroupPoller {
private var isPolling = false
private val handler: Handler by lazy { Handler() }
private val task = object : Runnable {
override fun run() {
poll()
handler.postDelayed(this, pollInterval)
}
}
// region Settings
companion object {
private val pollInterval: Long = 6 * 1000
}
// endregion
// region Error
class InsufficientSnodesException() : Exception("No snodes left to poll.")
class PollingCanceledException() : Exception("Polling canceled.")
// endregion
// region Public API
public fun startIfNeeded() {
if (isPolling) { return }
isPolling = true
task.run()
}
public fun pollOnce(): List<Promise<Unit, Exception>> {
if (isPolling) { return listOf() }
isPolling = true
return poll()
}
public fun stopIfNeeded() {
isPolling = false
handler.removeCallbacks(task)
}
// endregion
// region Private API
private fun poll(): List<Promise<Unit, Exception>> {
if (!isPolling) { return listOf() }
val storage = MessagingModuleConfiguration.shared.storage
val publicKeys = storage.getAllActiveClosedGroupPublicKeys()
return publicKeys.map { publicKey ->
val promise = SnodeAPI.getSwarm(publicKey).bind { swarm ->
val snode = swarm.getRandomElementOrNull() ?: throw InsufficientSnodesException() // Should be cryptographically secure
if (!isPolling) { throw PollingCanceledException() }
SnodeAPI.getRawMessages(snode, publicKey).map {SnodeAPI.parseRawMessagesResponse(it, snode, publicKey) }
}
promise.successBackground { messages ->
if (!storage.isGroupActive(publicKey)) { return@successBackground }
messages.forEach { envelope ->
val job = MessageReceiveJob(envelope.toByteArray())
JobQueue.shared.add(job)
}
}
promise.fail {
Log.d("Loki", "Polling failed for closed group with public key: $publicKey due to error: $it.")
}
promise.map { }
}
}
// endregion
}

View File

@ -0,0 +1,115 @@
package org.session.libsession.messaging.sending_receiving.pollers
import nl.komponents.kovenant.Promise
import nl.komponents.kovenant.functional.bind
import nl.komponents.kovenant.functional.map
import org.session.libsession.messaging.MessagingModuleConfiguration
import org.session.libsession.messaging.jobs.JobQueue
import org.session.libsession.messaging.jobs.MessageReceiveJob
import org.session.libsession.snode.SnodeAPI
import org.session.libsession.utilities.GroupUtil
import org.session.libsignal.crypto.getRandomElementOrNull
import org.session.libsignal.utilities.Log
import org.session.libsignal.utilities.successBackground
import java.util.*
import java.util.concurrent.Executors
import java.util.concurrent.ScheduledFuture
import java.util.concurrent.TimeUnit
import kotlin.math.min
class ClosedGroupPollerV2 {
private val executorService = Executors.newScheduledThreadPool(4)
private var isPolling = mutableMapOf<String, Boolean>()
private var futures = mutableMapOf<String, ScheduledFuture<*>>()
private fun isPolling(groupPublicKey: String): Boolean {
return isPolling[groupPublicKey] ?: false
}
companion object {
private val minPollInterval = 4 * 1000
private val maxPollInterval = 2 * 60 * 1000
@JvmStatic
val shared = ClosedGroupPollerV2()
}
class InsufficientSnodesException() : Exception("No snodes left to poll.")
class PollingCanceledException() : Exception("Polling canceled.")
fun start() {
val storage = MessagingModuleConfiguration.shared.storage
val allGroupPublicKeys = storage.getAllClosedGroupPublicKeys()
allGroupPublicKeys.forEach { startPolling(it) }
}
fun startPolling(groupPublicKey: String) {
if (isPolling(groupPublicKey)) { return }
setUpPolling(groupPublicKey)
isPolling[groupPublicKey] = true
}
fun stop() {
val storage = MessagingModuleConfiguration.shared.storage
val allGroupPublicKeys = storage.getAllClosedGroupPublicKeys()
allGroupPublicKeys.forEach { stopPolling(it) }
}
fun stopPolling(groupPublicKey: String) {
futures[groupPublicKey]?.cancel(false)
isPolling[groupPublicKey] = false
}
private fun setUpPolling(groupPublicKey: String) {
poll(groupPublicKey).success {
pollRecursively(groupPublicKey)
}.fail {
// The error is logged in poll(_:)
pollRecursively(groupPublicKey)
}
}
private fun pollRecursively(groupPublicKey: String) {
if (!isPolling(groupPublicKey)) { return }
// Get the received date of the last message in the thread. If we don't have any messages yet, pick some
// reasonable fake time interval to use instead.
val storage = MessagingModuleConfiguration.shared.storage
val groupID = GroupUtil.doubleEncodeGroupID(groupPublicKey)
val threadID = storage.getThreadID(groupID)?.toLongOrNull() ?: return
val lastUpdated = storage.getLastUpdated(threadID)
val timeSinceLastMessage = if (lastUpdated != -1L) Date().time - lastUpdated else 5 * 60 * 1000
val minPollInterval = Companion.minPollInterval
val limit: Long = 12 * 60 * 60 * 1000
val a = (Companion.maxPollInterval - minPollInterval).toDouble() / limit.toDouble()
val nextPollInterval = a * min(timeSinceLastMessage, limit) + minPollInterval
Log.d("Loki", "Next poll interval for closed group with public key: $groupPublicKey is ${nextPollInterval / 1000} s.")
executorService?.schedule({
poll(groupPublicKey).success {
pollRecursively(groupPublicKey)
}.fail {
// The error is logged in poll(_:)
pollRecursively(groupPublicKey)
}
}, nextPollInterval.toLong(), TimeUnit.MILLISECONDS)
}
fun poll(groupPublicKey: String): Promise<Unit, Exception> {
if (!isPolling(groupPublicKey)) { return Promise.of(Unit) }
val promise = SnodeAPI.getSwarm(groupPublicKey).bind { swarm ->
val snode = swarm.getRandomElementOrNull() ?: throw InsufficientSnodesException() // Should be cryptographically secure
if (!isPolling(groupPublicKey)) { throw PollingCanceledException() }
SnodeAPI.getRawMessages(snode, groupPublicKey).map { SnodeAPI.parseRawMessagesResponse(it, snode, groupPublicKey) }
}
promise.success { envelopes ->
if (!isPolling(groupPublicKey)) { return@success }
envelopes.forEach { envelope ->
val job = MessageReceiveJob(envelope.toByteArray())
JobQueue.shared.add(job)
}
}
promise.fail {
Log.d("Loki", "Polling failed for closed group with public key: $groupPublicKey due to error: $it.")
}
return promise.map { }
}
}