diff --git a/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java b/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java index 9f34af636b..afcfff4a21 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java +++ b/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java @@ -30,27 +30,27 @@ import androidx.lifecycle.ProcessLifecycleOwner; import androidx.multidex.MultiDexApplication; import org.conscrypt.Conscrypt; -import org.session.libsession.messaging.MessagingModuleConfiguration; import org.session.libsession.avatars.AvatarHelper; +import org.session.libsession.messaging.MessagingModuleConfiguration; import org.session.libsession.messaging.file_server.FileServerAPI; import org.session.libsession.messaging.mentions.MentionsManager; import org.session.libsession.messaging.open_groups.OpenGroupAPI; import org.session.libsession.messaging.sending_receiving.notifications.MessageNotifier; import org.session.libsession.messaging.sending_receiving.pollers.ClosedGroupPoller; import org.session.libsession.messaging.sending_receiving.pollers.Poller; -import org.session.libsession.utilities.Address; import org.session.libsession.snode.SnodeModule; +import org.session.libsession.utilities.Address; import org.session.libsession.utilities.IdentityKeyUtil; +import org.session.libsession.utilities.ProfileKeyUtil; import org.session.libsession.utilities.ProfilePictureUtilities; import org.session.libsession.utilities.SSKEnvironment; import org.session.libsession.utilities.TextSecurePreferences; import org.session.libsession.utilities.Util; import org.session.libsession.utilities.dynamiclanguage.DynamicLanguageContextWrapper; import org.session.libsession.utilities.dynamiclanguage.LocaleParser; -import org.session.libsession.utilities.ProfileKeyUtil; import org.session.libsignal.database.LokiAPIDatabaseProtocol; -import org.session.libsignal.utilities.ThreadUtils; import org.session.libsignal.utilities.Log; +import org.session.libsignal.utilities.ThreadUtils; import org.signal.aesgcmprovider.AesGcmProvider; import org.thoughtcrime.securesms.components.TypingStatusSender; import org.thoughtcrime.securesms.database.DatabaseFactory; @@ -67,7 +67,7 @@ import org.thoughtcrime.securesms.logging.UncaughtExceptionLogger; import org.thoughtcrime.securesms.loki.activities.HomeActivity; import org.thoughtcrime.securesms.loki.api.BackgroundPollWorker; import org.thoughtcrime.securesms.loki.api.LokiPushNotificationManager; -import org.thoughtcrime.securesms.loki.api.PublicChatManager; +import org.thoughtcrime.securesms.loki.api.OpenGroupManager; import org.thoughtcrime.securesms.loki.database.LokiAPIDatabase; import org.thoughtcrime.securesms.loki.database.LokiThreadDatabase; import org.thoughtcrime.securesms.loki.database.LokiUserDatabase; @@ -132,7 +132,6 @@ public class ApplicationContext extends MultiDexApplication implements Dependenc public MessageNotifier messageNotifier = null; public Poller poller = null; public ClosedGroupPoller closedGroupPoller = null; - public PublicChatManager publicChatManager = null; public Broadcaster broadcaster = null; public SignalCommunicationModule communicationModule; private Job firebaseInstanceIdJob; @@ -178,7 +177,6 @@ public class ApplicationContext extends MultiDexApplication implements Dependenc } setUpStorageAPIIfNeeded(); resubmitProfilePictureIfNeeded(); - publicChatManager = new PublicChatManager(this); updateOpenGroupProfilePicturesIfNeeded(); if (userPublicKey != null) { registerForFCMIfNeeded(false); @@ -208,8 +206,11 @@ public class ApplicationContext extends MultiDexApplication implements Dependenc poller.setCaughtUp(false); } startPollingIfNeeded(); + // FIXME: Open group handling + /* publicChatManager.markAllAsNotCaughtUp(); - publicChatManager.startPollersIfNeeded(); + */ + OpenGroupManager.INSTANCE.startPolling(); } @Override @@ -230,9 +231,7 @@ public class ApplicationContext extends MultiDexApplication implements Dependenc @Override public void onTerminate() { stopKovenant(); // Loki - if (publicChatManager != null) { - publicChatManager.stopPollers(); - } + OpenGroupManager.INSTANCE.stopPolling(); super.onTerminate(); } @@ -465,18 +464,6 @@ public class ApplicationContext extends MultiDexApplication implements Dependenc } } - public void stopPolling() { - if (poller != null) { - poller.stopIfNeeded(); - } - if (closedGroupPoller != null) { - closedGroupPoller.stopIfNeeded(); - } - if (publicChatManager != null) { - publicChatManager.stopPollers(); - } - } - private void resubmitProfilePictureIfNeeded() { // Files expire on the file server after a while, so we simply re-upload the user's profile picture // at a certain interval to ensure it's always available. diff --git a/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt b/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt index bd8d7abab5..d1e620a117 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt @@ -37,6 +37,7 @@ import org.session.libsignal.messages.SignalServiceGroup import org.thoughtcrime.securesms.ApplicationContext import org.thoughtcrime.securesms.database.helpers.SQLCipherOpenHelper import org.thoughtcrime.securesms.jobs.RetrieveProfileAvatarJob +import org.thoughtcrime.securesms.loki.api.OpenGroupManager import org.thoughtcrime.securesms.loki.database.LokiThreadDatabase import org.thoughtcrime.securesms.loki.protocol.SessionMetaProtocol import org.thoughtcrime.securesms.loki.utilities.OpenGroupUtilities @@ -561,9 +562,9 @@ class Storage(context: Context, helper: SQLCipherOpenHelper) : Database(context, val room = httpUrl.pathSegments().firstOrNull() ?: return val publicKey = httpUrl.queryParameter("public_key") ?: return - OpenGroupUtilities.addGroup(context, server.toString().removeSuffix("/"), room, publicKey) + OpenGroupManager.add(server.toString().removeSuffix("/"), room, publicKey, context) } else { - OpenGroupUtilities.addGroup(context, serverUrl, channel) + // TODO: No longer supported so let's remove this code } } diff --git a/app/src/main/java/org/thoughtcrime/securesms/loki/activities/HomeActivity.kt b/app/src/main/java/org/thoughtcrime/securesms/loki/activities/HomeActivity.kt index b399d6e192..e1dd2fc864 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/loki/activities/HomeActivity.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/loki/activities/HomeActivity.kt @@ -40,6 +40,7 @@ import org.thoughtcrime.securesms.PassphraseRequiredActionBarActivity import org.thoughtcrime.securesms.conversation.ConversationActivity import org.thoughtcrime.securesms.database.DatabaseFactory import org.thoughtcrime.securesms.database.model.ThreadRecord +import org.thoughtcrime.securesms.loki.api.OpenGroupManager import org.thoughtcrime.securesms.loki.dialogs.* import org.thoughtcrime.securesms.loki.protocol.MultiDeviceProtocol import org.thoughtcrime.securesms.loki.utilities.* @@ -139,7 +140,7 @@ class HomeActivity : PassphraseRequiredActionBarActivity(), val userPublicKey = TextSecurePreferences.getLocalNumber(this) if (userPublicKey != null) { MentionsManager.configureIfNeeded(userPublicKey, userDB) - application.publicChatManager.startPollersIfNeeded() + OpenGroupManager.startPolling() JobQueue.shared.resumePendingJobs() } IP2Country.configureIfNeeded(this) @@ -359,14 +360,12 @@ class HomeActivity : PassphraseRequiredActionBarActivity(), apiDB.removeLastDeletionServerID(v1OpenGroup.channel, v1OpenGroup.server) apiDB.clearOpenGroupProfilePictureURL(v1OpenGroup.channel, v1OpenGroup.server) OpenGroupAPI.leave(v1OpenGroup.channel, v1OpenGroup.server) - ApplicationContext.getInstance(context).publicChatManager - .removeChat(v1OpenGroup.server, v1OpenGroup.channel) + // FIXME: No longer supported so let's remove this code } else if (v2OpenGroup != null) { val apiDB = DatabaseFactory.getLokiAPIDatabase(context) apiDB.removeLastMessageServerID(v2OpenGroup.room, v2OpenGroup.server) apiDB.removeLastDeletionServerID(v2OpenGroup.room, v2OpenGroup.server) - ApplicationContext.getInstance(context).publicChatManager - .removeChat(v2OpenGroup.server, v2OpenGroup.room) + OpenGroupManager.delete(v2OpenGroup.server, v2OpenGroup.room, this@HomeActivity) } else { threadDB.deleteConversation(threadID) } diff --git a/app/src/main/java/org/thoughtcrime/securesms/loki/activities/JoinPublicChatActivity.kt b/app/src/main/java/org/thoughtcrime/securesms/loki/activities/JoinPublicChatActivity.kt index bec0732995..d6aead80aa 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/loki/activities/JoinPublicChatActivity.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/loki/activities/JoinPublicChatActivity.kt @@ -33,6 +33,7 @@ import org.thoughtcrime.securesms.BaseActionBarActivity import org.thoughtcrime.securesms.PassphraseRequiredActionBarActivity import org.thoughtcrime.securesms.conversation.ConversationActivity import org.thoughtcrime.securesms.groups.GroupManager +import org.thoughtcrime.securesms.loki.api.OpenGroupManager import org.thoughtcrime.securesms.loki.fragments.ScanQRCodeWrapperFragment import org.thoughtcrime.securesms.loki.fragments.ScanQRCodeWrapperFragmentDelegate import org.thoughtcrime.securesms.loki.protocol.MultiDeviceProtocol @@ -92,16 +93,15 @@ class JoinPublicChatActivity : PassphraseRequiredActionBarActivity(), ScanQRCode val server = HttpUrl.Builder().scheme(url.scheme()).host(url.host()).apply { if (url.port() != 80 || url.port() != 443) { this.port(url.port()) } // Non-standard port; add to server }.build() - val group = OpenGroupUtilities.addGroup(this@JoinPublicChatActivity, server.toString().removeSuffix("/"), room!!, publicKey!!) - val threadID = GroupManager.getOpenGroupThreadID(group.id, this@JoinPublicChatActivity) - val groupID = GroupUtil.getEncodedOpenGroupID(group.id.toByteArray()) + + val sanitizedServer = server.toString().removeSuffix("/") + val openGroupID = "$sanitizedServer.${room!!}" + OpenGroupManager.add(sanitizedServer, room, publicKey!!, this@JoinPublicChatActivity) + val threadID = GroupManager.getOpenGroupThreadID(openGroupID, this@JoinPublicChatActivity) + val groupID = GroupUtil.getEncodedOpenGroupID(openGroupID.toByteArray()) threadID to groupID } else { - val channel: Long = 1 - val group = OpenGroupUtilities.addGroup(this@JoinPublicChatActivity, stringWithExplicitScheme, channel) - val threadID = GroupManager.getOpenGroupThreadID(group.id, this@JoinPublicChatActivity) - val groupID = GroupUtil.getEncodedOpenGroupID(group.id.toByteArray()) - threadID to groupID + throw Exception("No longer supported.") } MultiDeviceProtocol.forceSyncConfigurationNowIfNeeded(this@JoinPublicChatActivity) withContext(Dispatchers.Main) { diff --git a/app/src/main/java/org/thoughtcrime/securesms/loki/api/BackgroundPollWorker.kt b/app/src/main/java/org/thoughtcrime/securesms/loki/api/BackgroundPollWorker.kt index ffe7e230e0..fc3e7c1bba 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/loki/api/BackgroundPollWorker.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/loki/api/BackgroundPollWorker.kt @@ -10,8 +10,7 @@ import nl.komponents.kovenant.functional.map import org.session.libsession.messaging.jobs.MessageReceiveJob import org.session.libsession.messaging.open_groups.OpenGroupV2 import org.session.libsession.messaging.sending_receiving.pollers.ClosedGroupPoller -import org.session.libsession.messaging.sending_receiving.pollers.OpenGroupPoller -import org.session.libsession.messaging.sending_receiving.pollers.OpenGroupV2Poller +import org.session.libsession.messaging.sending_receiving.pollers.OpenGroupPollerV2 import org.session.libsession.snode.SnodeAPI import org.session.libsession.utilities.TextSecurePreferences import org.session.libsignal.utilities.Log @@ -61,12 +60,14 @@ class BackgroundPollWorker(val context: Context, params: WorkerParameters) : Wor promises.addAll(ClosedGroupPoller().pollOnce()) // Open Groups - val v2OpenGroups = DatabaseFactory.getLokiThreadDatabase(context).getAllV2OpenGroups().values.groupBy(OpenGroupV2::server) + val threadDB = DatabaseFactory.getLokiThreadDatabase(context) + val v2OpenGroups = threadDB.getAllV2OpenGroups() + val v2OpenGroupServers = v2OpenGroups.map { it.value.server }.toSet() - v2OpenGroups.values.map { groups -> - OpenGroupV2Poller(groups) - }.forEach { poller -> - promises.add(poller.compactPoll(true).map { }) + for (server in v2OpenGroupServers) { + val poller = OpenGroupPollerV2(server, null) + poller.hasStarted = true + promises.add(poller.poll(true)) } // Wait until all the promises are resolved diff --git a/app/src/main/java/org/thoughtcrime/securesms/loki/api/OpenGroupManager.kt b/app/src/main/java/org/thoughtcrime/securesms/loki/api/OpenGroupManager.kt new file mode 100644 index 0000000000..62aca11265 --- /dev/null +++ b/app/src/main/java/org/thoughtcrime/securesms/loki/api/OpenGroupManager.kt @@ -0,0 +1,101 @@ +package org.thoughtcrime.securesms.loki.api + +import android.content.Context +import android.graphics.Bitmap +import androidx.annotation.WorkerThread +import org.session.libsession.messaging.MessagingModuleConfiguration +import org.session.libsession.messaging.open_groups.OpenGroupAPIV2 +import org.session.libsession.messaging.open_groups.OpenGroupV2 +import org.session.libsession.messaging.sending_receiving.pollers.OpenGroupPollerV2 +import org.session.libsession.utilities.Util +import org.session.libsignal.utilities.ThreadUtils +import org.thoughtcrime.securesms.database.DatabaseFactory +import org.thoughtcrime.securesms.groups.GroupManager +import org.thoughtcrime.securesms.util.BitmapUtil +import java.util.concurrent.Executors + +object OpenGroupManager { + private val executorService = Executors.newScheduledThreadPool(4) + private var pollers = mutableMapOf() // One for each server + private var isPolling = false + + fun startPolling() { + if (isPolling) { return } + isPolling = true + val storage = MessagingModuleConfiguration.shared.storage + val servers = storage.getAllV2OpenGroups().values.map { it.server }.toSet() + servers.forEach { server -> + pollers[server]?.stop() // Shouldn't be necessary + val poller = OpenGroupPollerV2(server, executorService) + poller.startIfNeeded() + pollers[server] = poller + } + } + + fun stopPolling() { + pollers.forEach { it.value.stop() } + pollers.clear() + } + + @WorkerThread + fun add(server: String, room: String, publicKey: String, context: Context) { + val openGroupID = "$server.$room" + var threadID = GroupManager.getOpenGroupThreadID(openGroupID, context) + val storage = MessagingModuleConfiguration.shared.storage + val threadDB = DatabaseFactory.getLokiThreadDatabase(context) + // Check it it's added already + val existingOpenGroup = threadDB.getOpenGroupChat(threadID) + if (existingOpenGroup != null) { return } + // Clear any existing data if needed + storage.removeLastDeletionServerId(room, server) + storage.removeLastMessageServerId(room, server) + // Store the public key + storage.setOpenGroupPublicKey(server,publicKey) + // Get an auth token + OpenGroupAPIV2.getAuthToken(room, server).get() + // Get group info + val info = OpenGroupAPIV2.getInfo(room, server).get() + // Download the group image + // FIXME: Don't wait for the image to download + val image: Bitmap? + if (threadID < 0) { + val profilePictureAsByteArray = try { + OpenGroupAPIV2.downloadOpenGroupProfilePicture(info.id, server).get() + } catch (e: Exception) { + null + } + image = BitmapUtil.fromByteArray(profilePictureAsByteArray) + // Create the group locally + threadID = GroupManager.createOpenGroup(openGroupID, context, image, info.name).threadId + } + val openGroup = OpenGroupV2(server, room, info.name, publicKey) + threadDB.setOpenGroupChat(openGroup, threadID) + // Start the poller if needed + if (pollers[server] == null) { + val poller = OpenGroupPollerV2(server, executorService) + Util.runOnMain { poller.startIfNeeded() } + pollers[server] = poller + } + } + + fun delete(server: String, room: String, context: Context) { + val storage = MessagingModuleConfiguration.shared.storage + val threadDB = DatabaseFactory.getThreadDatabase(context) + val openGroupID = "$server.$room" + val threadID = GroupManager.getOpenGroupThreadID(openGroupID, context) + val groupID = threadDB.getRecipientForThreadId(threadID)!!.address.serialize() + // Stop the poller if needed + val openGroups = storage.getAllV2OpenGroups().filter { it.value.server == server } + if (openGroups.count() == 1) { + val poller = pollers[server] + poller?.stop() + pollers.remove(server) + } + // Delete + ThreadUtils.queue { + storage.removeLastDeletionServerId(room, server) + storage.removeLastMessageServerId(room, server) + GroupManager.deleteGroup(groupID, context) // Must be invoked on a background thread + } + } +} \ No newline at end of file diff --git a/app/src/main/java/org/thoughtcrime/securesms/loki/api/PublicChatManager.kt b/app/src/main/java/org/thoughtcrime/securesms/loki/api/PublicChatManager.kt deleted file mode 100644 index 70bb78d8b6..0000000000 --- a/app/src/main/java/org/thoughtcrime/securesms/loki/api/PublicChatManager.kt +++ /dev/null @@ -1,212 +0,0 @@ -package org.thoughtcrime.securesms.loki.api - -import android.content.Context -import android.database.ContentObserver -import android.graphics.Bitmap -import android.text.TextUtils -import androidx.annotation.WorkerThread -import org.session.libsession.messaging.MessagingModuleConfiguration -import org.session.libsession.messaging.open_groups.* -import org.session.libsession.messaging.sending_receiving.pollers.OpenGroupPoller -import org.session.libsession.messaging.sending_receiving.pollers.OpenGroupV2Poller -import org.session.libsession.utilities.TextSecurePreferences -import org.session.libsession.utilities.Util -import org.session.libsignal.utilities.ThreadUtils -import org.thoughtcrime.securesms.database.DatabaseContentProviders -import org.thoughtcrime.securesms.database.DatabaseFactory -import org.thoughtcrime.securesms.groups.GroupManager -import org.thoughtcrime.securesms.util.BitmapUtil -import java.util.concurrent.Executors - -class PublicChatManager(private val context: Context) { - private var chats = mutableMapOf() - private var v2Chats = mutableMapOf() - private val pollers = mutableMapOf() - private val v2Pollers = mutableMapOf() - private val observers = mutableMapOf() - private var isPolling = false - private val executorService = Executors.newScheduledThreadPool(4) - - public fun areAllCaughtUp(): Boolean { - var areAllCaughtUp = true - refreshChatsAndPollers() - for ((threadID, _) in chats) { - val poller = pollers[threadID] - areAllCaughtUp = if (poller != null) areAllCaughtUp && poller.isCaughtUp else areAllCaughtUp - } - return areAllCaughtUp - } - - public fun markAllAsNotCaughtUp() { - refreshChatsAndPollers() - for ((threadID, chat) in chats) { - val poller = pollers[threadID] ?: OpenGroupPoller(chat, executorService) - poller.isCaughtUp = false - } - for ((_,poller) in v2Pollers) { - poller.isCaughtUp = false - } - } - - public fun startPollersIfNeeded() { - refreshChatsAndPollers() - - for ((threadId, chat) in chats) { - val poller = pollers[threadId] ?: OpenGroupPoller(chat, executorService) - poller.startIfNeeded() - listenToThreadDeletion(threadId) - if (!pollers.containsKey(threadId)) { pollers[threadId] = poller } - } - v2Pollers.values.forEach { it.stop() } - v2Pollers.clear() - v2Chats.entries.groupBy { (_, group) -> group.server }.forEach { (server, threadedRooms) -> - val poller = OpenGroupV2Poller(threadedRooms.map { it.value }, executorService) - poller.startIfNeeded() - threadedRooms.forEach { (thread, _) -> - listenToThreadDeletion(thread) - } - v2Pollers[server] = poller - } - - isPolling = true - } - - public fun stopPollers() { - pollers.values.forEach { it.stop() } - isPolling = false - executorService.shutdown() - } - - //TODO Declare a specific type of checked exception instead of "Exception". - @WorkerThread - @Throws(java.lang.Exception::class) - public fun addChat(server: String, channel: Long): OpenGroup { - // Ensure the auth token is acquired. - OpenGroupAPI.getAuthToken(server).get() - - val channelInfo = OpenGroupAPI.getChannelInfo(channel, server).get() - return addChat(server, channel, channelInfo) - } - - @WorkerThread - public fun addChat(server: String, channel: Long, info: OpenGroupInfo): OpenGroup { - val chat = OpenGroup(channel, server, info.displayName, true) - var threadID = GroupManager.getOpenGroupThreadID(chat.id, context) - var profilePicture: Bitmap? = null - // Create the group if we don't have one - if (threadID < 0) { - if (info.profilePictureURL.isNotEmpty()) { - val profilePictureAsByteArray = OpenGroupAPI.downloadOpenGroupProfilePicture(server, info.profilePictureURL) - profilePicture = BitmapUtil.fromByteArray(profilePictureAsByteArray) - } - val result = GroupManager.createOpenGroup(chat.id, context, profilePicture, chat.displayName) - threadID = result.threadId - } - DatabaseFactory.getLokiThreadDatabase(context).setPublicChat(chat, threadID) - // Set our name on the server - val displayName = TextSecurePreferences.getProfileName(context) - if (!TextUtils.isEmpty(displayName)) { - OpenGroupAPI.setDisplayName(displayName, server) - } - // Start polling - Util.runOnMain { startPollersIfNeeded() } - - return chat - } - - @WorkerThread - fun addChat(server: String, room: String, info: OpenGroupAPIV2.Info, publicKey: String): OpenGroupV2 { - val chat = OpenGroupV2(server, room, info.name, publicKey) - var threadID = GroupManager.getOpenGroupThreadID(chat.id, context) - val profilePicture: Bitmap? - if (threadID < 0) { - val profilePictureAsByteArray = try { - OpenGroupAPIV2.downloadOpenGroupProfilePicture(info.id,server).get() - } catch (e: Exception) { - null - } - profilePicture = BitmapUtil.fromByteArray(profilePictureAsByteArray) - val result = GroupManager.createOpenGroup(chat.id, context, profilePicture, info.name) - threadID = result.threadId - } - DatabaseFactory.getLokiThreadDatabase(context).setOpenGroupChat(chat, threadID) - Util.runOnMain { startPollersIfNeeded() } - return chat - } - - public fun removeChat(server: String, channel: Long) { - val threadDB = DatabaseFactory.getThreadDatabase(context) - val groupId = OpenGroup.getId(channel, server) - val threadId = GroupManager.getOpenGroupThreadID(groupId, context) - val groupAddress = threadDB.getRecipientForThreadId(threadId)!!.address.serialize() - ThreadUtils.queue { - GroupManager.deleteGroup(groupAddress, context) // Must be invoked on a background thread - Util.runOnMain { startPollersIfNeeded() } - } - } - - fun removeChat(server: String, room: String) { - val threadDB = DatabaseFactory.getThreadDatabase(context) - val groupId = "$server.$room" - val threadId = GroupManager.getOpenGroupThreadID(groupId, context) - val groupAddress = threadDB.getRecipientForThreadId(threadId)!!.address.serialize() - ThreadUtils.queue { - GroupManager.deleteGroup(groupAddress, context) // Must be invoked on a background thread - Util.runOnMain { startPollersIfNeeded() } - } - } - - private fun refreshChatsAndPollers() { - val storage = MessagingModuleConfiguration.shared.storage - val chatsInDB = storage.getAllOpenGroups() - val v2ChatsInDB = storage.getAllV2OpenGroups() - val removedChatThreadIds = chats.keys.filter { !chatsInDB.keys.contains(it) } - removedChatThreadIds.forEach { pollers.remove(it)?.stop() } - - // Only append to chats if we have a thread for the chat - chats = chatsInDB.filter { GroupManager.getOpenGroupThreadID(it.value.id, context) > -1 }.toMutableMap() - v2Chats = v2ChatsInDB.filter { GroupManager.getOpenGroupThreadID(it.value.id, context) > -1 }.toMutableMap() - } - - private fun listenToThreadDeletion(threadID: Long) { - if (threadID < 0 || observers[threadID] != null) { return } - val observer = createDeletionObserver(threadID) { - val chat = chats[threadID] - - // Reset last message cache - if (chat != null) { - val apiDatabase = DatabaseFactory.getLokiAPIDatabase(context) - apiDatabase.removeLastDeletionServerID(chat.channel, chat.server) - apiDatabase.removeLastMessageServerID(chat.channel, chat.server) - } - - DatabaseFactory.getLokiThreadDatabase(context).removePublicChat(threadID) - pollers.remove(threadID)?.stop() - v2Pollers.values.forEach { it.stop() } - v2Pollers.clear() - observers.remove(threadID) - startPollersIfNeeded() - } - observers[threadID] = observer - - context.applicationContext.contentResolver.registerContentObserver(DatabaseContentProviders.Conversation.getUriForThread(threadID), true, observer) - } - - private fun createDeletionObserver(threadID: Long, onDelete: Runnable): ContentObserver { - return object : ContentObserver(null) { - - override fun onChange(selfChange: Boolean) { - super.onChange(selfChange) - // Stop the poller if thread is deleted - try { - if (!DatabaseFactory.getThreadDatabase(context).hasThread(threadID)) { - onDelete.run() - context.applicationContext.contentResolver.unregisterContentObserver(this) - } - } catch (e: Exception) { - // TODO: Handle - } - } - } - } -} \ No newline at end of file diff --git a/app/src/main/java/org/thoughtcrime/securesms/loki/utilities/OpenGroupUtilities.kt b/app/src/main/java/org/thoughtcrime/securesms/loki/utilities/OpenGroupUtilities.kt index fa0d7a82e2..8c08d53e3b 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/loki/utilities/OpenGroupUtilities.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/loki/utilities/OpenGroupUtilities.kt @@ -22,54 +22,6 @@ object OpenGroupUtilities { private const val TAG = "OpenGroupUtilities" - @JvmStatic - @WorkerThread - fun addGroup(context: Context, server: String, room: String, publicKey: String): OpenGroupV2 { - val groupId = "$server.$room" - val threadID = GroupManager.getOpenGroupThreadID(groupId, context) - val openGroup = DatabaseFactory.getLokiThreadDatabase(context).getOpenGroupChat(threadID) - if (openGroup != null) return openGroup - - MessagingModuleConfiguration.shared.storage.setOpenGroupPublicKey(server,publicKey) - OpenGroupAPIV2.getAuthToken(room, server).get() - val groupInfo = OpenGroupAPIV2.getInfo(room,server).get() - val application = ApplicationContext.getInstance(context) - - val storage = MessagingModuleConfiguration.shared.storage - storage.removeLastDeletionServerId(room, server) - storage.removeLastMessageServerId(room, server) - val group = application.publicChatManager.addChat(server, room, groupInfo, publicKey) - - return group - } - - @JvmStatic - @WorkerThread - @Throws(Exception::class) - fun addGroup(context: Context, url: String, channel: Long): OpenGroup { - // Check for an existing group. - val groupID = OpenGroup.getId(channel, url) - val threadID = GroupManager.getOpenGroupThreadID(groupID, context) - val openGroup = DatabaseFactory.getLokiThreadDatabase(context).getPublicChat(threadID) - if (openGroup != null) { return openGroup } - - // Add the new group. - val application = ApplicationContext.getInstance(context) - val displayName = TextSecurePreferences.getProfileName(context) - - val group = application.publicChatManager.addChat(url, channel) - - DatabaseFactory.getLokiAPIDatabase(context).removeLastMessageServerID(channel, url) - DatabaseFactory.getLokiAPIDatabase(context).removeLastDeletionServerID(channel, url) - OpenGroupAPI.getMessages(channel, url) - OpenGroupAPI.setDisplayName(displayName, url) - OpenGroupAPI.join(channel, url) - val profileKey: ByteArray = ProfileKeyUtil.getProfileKey(context) - val profileUrl: String? = TextSecurePreferences.getProfilePictureURL(context) - OpenGroupAPI.setProfilePicture(url, profileKey, profileUrl) - return group - } - /** * Pulls the general public chat data from the server and updates related records. * Fires [GroupInfoUpdatedEvent] on [EventBus] upon success. diff --git a/app/src/main/java/org/thoughtcrime/securesms/loki/views/OpenGroupInvitationView.kt b/app/src/main/java/org/thoughtcrime/securesms/loki/views/OpenGroupInvitationView.kt index 32736ad7fb..edfc81cf79 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/loki/views/OpenGroupInvitationView.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/loki/views/OpenGroupInvitationView.kt @@ -11,6 +11,7 @@ import kotlinx.coroutines.launch import network.loki.messenger.R import org.session.libsession.utilities.OpenGroupUrlParser import org.session.libsignal.utilities.Log +import org.thoughtcrime.securesms.loki.api.OpenGroupManager import org.thoughtcrime.securesms.loki.protocol.MultiDeviceProtocol import org.thoughtcrime.securesms.loki.utilities.OpenGroupUtilities @@ -56,12 +57,7 @@ class OpenGroupInvitationView : FrameLayout { GlobalScope.launch(Dispatchers.IO) { try { dialog.dismiss() - val group = OpenGroupUtilities.addGroup( - context, - openGroup.server, - openGroup.room, - openGroup.serverPublicKey - ) + OpenGroupManager.add(openGroup.server, openGroup.room, openGroup.serverPublicKey, context) MultiDeviceProtocol.forceSyncConfigurationNowIfNeeded(context) } catch (e: Exception) { Log.e("Loki", "Failed to join open group.", e) diff --git a/app/src/main/java/org/thoughtcrime/securesms/notifications/OptimizedMessageNotifier.java b/app/src/main/java/org/thoughtcrime/securesms/notifications/OptimizedMessageNotifier.java index fcf7eb0a31..46b4d836d7 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/notifications/OptimizedMessageNotifier.java +++ b/app/src/main/java/org/thoughtcrime/securesms/notifications/OptimizedMessageNotifier.java @@ -12,7 +12,6 @@ import org.session.libsession.utilities.recipients.Recipient; import org.session.libsession.utilities.Debouncer; import org.session.libsignal.utilities.ThreadUtils; import org.thoughtcrime.securesms.ApplicationContext; -import org.thoughtcrime.securesms.loki.api.PublicChatManager; import java.util.concurrent.TimeUnit; @@ -42,16 +41,19 @@ public class OptimizedMessageNotifier implements MessageNotifier { @Override public void updateNotification(@NonNull Context context) { - Poller lokiPoller = ApplicationContext.getInstance(context).poller; - PublicChatManager publicChatManager = ApplicationContext.getInstance(context).publicChatManager; + Poller poller = ApplicationContext.getInstance(context).poller; + // FIXME: Open group handling boolean isCaughtUp = true; - if (lokiPoller != null) { - isCaughtUp = isCaughtUp && lokiPoller.isCaughtUp(); + if (poller != null) { + isCaughtUp = isCaughtUp && poller.isCaughtUp(); } + // FIXME: Open group handling + /* if (publicChatManager != null) { isCaughtUp = isCaughtUp && publicChatManager.areAllCaughtUp(); } + */ if (isCaughtUp) { performOnBackgroundThreadIfNeeded(() -> wrapped.updateNotification(context)); @@ -63,15 +65,18 @@ public class OptimizedMessageNotifier implements MessageNotifier { @Override public void updateNotification(@NonNull Context context, long threadId) { Poller lokiPoller = ApplicationContext.getInstance(context).poller; - PublicChatManager publicChatManager = ApplicationContext.getInstance(context).publicChatManager; + // FIXME: Open group handling boolean isCaughtUp = true; if (lokiPoller != null) { isCaughtUp = isCaughtUp && lokiPoller.isCaughtUp(); } + // FIXME: Open group handling + /* if (publicChatManager != null) { isCaughtUp = isCaughtUp && publicChatManager.areAllCaughtUp(); } + */ if (isCaughtUp) { performOnBackgroundThreadIfNeeded(() -> wrapped.updateNotification(context, threadId)); @@ -83,15 +88,18 @@ public class OptimizedMessageNotifier implements MessageNotifier { @Override public void updateNotification(@NonNull Context context, long threadId, boolean signal) { Poller lokiPoller = ApplicationContext.getInstance(context).poller; - PublicChatManager publicChatManager = ApplicationContext.getInstance(context).publicChatManager; + // FIXME: Open group handling boolean isCaughtUp = true; if (lokiPoller != null) { isCaughtUp = isCaughtUp && lokiPoller.isCaughtUp(); } + // FIXME: Open group handling + /* if (publicChatManager != null) { isCaughtUp = isCaughtUp && publicChatManager.areAllCaughtUp(); } + */ if (isCaughtUp) { performOnBackgroundThreadIfNeeded(() -> wrapped.updateNotification(context, threadId, signal)); @@ -103,15 +111,18 @@ public class OptimizedMessageNotifier implements MessageNotifier { @Override public void updateNotification(@androidx.annotation.NonNull Context context, boolean signal, int reminderCount) { Poller lokiPoller = ApplicationContext.getInstance(context).poller; - PublicChatManager publicChatManager = ApplicationContext.getInstance(context).publicChatManager; + // FIXME: Open group handling boolean isCaughtUp = true; if (lokiPoller != null) { isCaughtUp = isCaughtUp && lokiPoller.isCaughtUp(); } + // FIXME: Open group handling + /* if (publicChatManager != null) { isCaughtUp = isCaughtUp && publicChatManager.areAllCaughtUp(); } + */ if (isCaughtUp) { performOnBackgroundThreadIfNeeded(() -> wrapped.updateNotification(context, signal, reminderCount)); diff --git a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/ClosedGroupPoller.kt b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/ClosedGroupPoller.kt index 08b386d95e..1b960718a1 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/ClosedGroupPoller.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/ClosedGroupPoller.kt @@ -26,7 +26,7 @@ class ClosedGroupPoller { // region Settings companion object { - private val pollInterval: Long = 2 * 1000 + private val pollInterval: Long = 6 * 1000 } // endregion @@ -57,7 +57,8 @@ class ClosedGroupPoller { // region Private API private fun poll(): List> { if (!isPolling) { return listOf() } - val publicKeys = MessagingModuleConfiguration.shared.storage.getAllActiveClosedGroupPublicKeys() + val storage = MessagingModuleConfiguration.shared.storage + val publicKeys = storage.getAllActiveClosedGroupPublicKeys() return publicKeys.map { publicKey -> val promise = SnodeAPI.getSwarm(publicKey).bind { swarm -> val snode = swarm.getRandomElementOrNull() ?: throw InsufficientSnodesException() // Should be cryptographically secure @@ -65,10 +66,7 @@ class ClosedGroupPoller { SnodeAPI.getRawMessages(snode, publicKey).map {SnodeAPI.parseRawMessagesResponse(it, snode, publicKey) } } promise.successBackground { messages -> - if (!MessagingModuleConfiguration.shared.storage.isGroupActive(publicKey)) { - // ignore inactive group's messages - return@successBackground - } + if (!storage.isGroupActive(publicKey)) { return@successBackground } messages.forEach { envelope -> val job = MessageReceiveJob(envelope.toByteArray()) JobQueue.shared.add(job) @@ -77,7 +75,7 @@ class ClosedGroupPoller { promise.fail { Log.d("Loki", "Polling failed for closed group with public key: $publicKey due to error: $it.") } - promise.map { Unit } + promise.map { } } } // endregion diff --git a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPollerV2.kt b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPollerV2.kt new file mode 100644 index 0000000000..320c3afd97 --- /dev/null +++ b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPollerV2.kt @@ -0,0 +1,92 @@ +package org.session.libsession.messaging.sending_receiving.pollers + +import nl.komponents.kovenant.Promise +import nl.komponents.kovenant.functional.map +import org.session.libsession.messaging.MessagingModuleConfiguration +import org.session.libsession.messaging.jobs.JobQueue +import org.session.libsession.messaging.jobs.MessageReceiveJob +import org.session.libsession.messaging.open_groups.OpenGroupAPIV2 +import org.session.libsession.messaging.open_groups.OpenGroupMessageV2 +import org.session.libsession.utilities.Address +import org.session.libsession.utilities.GroupUtil +import org.session.libsignal.protos.SignalServiceProtos +import org.session.libsignal.utilities.Log +import org.session.libsignal.utilities.successBackground +import java.util.concurrent.ScheduledExecutorService +import java.util.concurrent.ScheduledFuture +import java.util.concurrent.TimeUnit + +class OpenGroupPollerV2(private val server: String, private val executorService: ScheduledExecutorService?) { + var hasStarted = false + private var future: ScheduledFuture<*>? = null + + companion object { + private val pollInterval: Long = 4 * 1000 + } + + fun startIfNeeded() { + if (hasStarted) { return } + hasStarted = true + future = executorService?.schedule(::poll, 0, TimeUnit.MILLISECONDS) + } + + fun stop() { + future?.cancel(false) + hasStarted = false + } + + fun poll(isBackgroundPoll: Boolean = false): Promise { + val storage = MessagingModuleConfiguration.shared.storage + val rooms = storage.getAllV2OpenGroups().values.filter { it.server == server }.map { it.room } + return OpenGroupAPIV2.compactPoll(rooms, server).successBackground { responses -> + responses.forEach { (room, response) -> + val openGroupID = "$server.$room" + handleNewMessages(openGroupID, response.messages, isBackgroundPoll) + handleDeletedMessages(openGroupID, response.deletions) + } + }.always { + executorService?.schedule(this@OpenGroupPollerV2::poll, OpenGroupPollerV2.pollInterval, TimeUnit.MILLISECONDS) + }.map { } + } + + private fun handleNewMessages(openGroupID: String, messages: List, isBackgroundPoll: Boolean) { + if (!hasStarted) { return } + messages.sortedBy { it.serverID!! }.forEach { message -> + try { + val senderPublicKey = message.sender!! + val builder = SignalServiceProtos.Envelope.newBuilder() + builder.type = SignalServiceProtos.Envelope.Type.SESSION_MESSAGE + builder.source = senderPublicKey + builder.sourceDevice = 1 + builder.content = message.toProto().toByteString() + builder.timestamp = message.sentTimestamp + val envelope = builder.build() + val job = MessageReceiveJob(envelope.toByteArray(), message.serverID, openGroupID) + if (isBackgroundPoll) { + job.executeAsync() + } else { + JobQueue.shared.add(job) + } + } catch (e: Exception) { + Log.e("Loki", "Exception parsing message", e) + } + } + } + + private fun handleDeletedMessages(openGroupID: String, deletedMessageServerIDs: List) { + val storage = MessagingModuleConfiguration.shared.storage + val dataProvider = MessagingModuleConfiguration.shared.messageDataProvider + val groupID = GroupUtil.getEncodedOpenGroupID(openGroupID.toByteArray()) + val threadID = storage.getThreadIdFor(Address.fromSerialized(groupID)) ?: return + val deletedMessageIDs = deletedMessageServerIDs.mapNotNull { serverID -> + val messageID = dataProvider.getMessageID(serverID, threadID) + if (messageID == null) { + Log.d("Loki", "Couldn't find message ID for message with serverID: $serverID.") + } + messageID + } + deletedMessageIDs.forEach { (messageId, isSms) -> + MessagingModuleConfiguration.shared.messageDataProvider.deleteMessage(messageId, isSms) + } + } +} \ No newline at end of file diff --git a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupV2Poller.kt b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupV2Poller.kt deleted file mode 100644 index f0d03e2c29..0000000000 --- a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupV2Poller.kt +++ /dev/null @@ -1,134 +0,0 @@ -package org.session.libsession.messaging.sending_receiving.pollers - -import nl.komponents.kovenant.Promise -import org.session.libsession.messaging.MessagingModuleConfiguration -import org.session.libsession.messaging.jobs.JobQueue -import org.session.libsession.messaging.jobs.MessageReceiveJob -import org.session.libsession.messaging.open_groups.OpenGroupAPIV2 -import org.session.libsession.messaging.open_groups.OpenGroupMessageV2 -import org.session.libsession.messaging.open_groups.OpenGroupV2 -import org.session.libsession.utilities.Address -import org.session.libsession.utilities.GroupUtil -import org.session.libsignal.protos.SignalServiceProtos -import org.session.libsignal.utilities.Log -import org.session.libsignal.utilities.successBackground -import java.util.concurrent.ScheduledExecutorService -import java.util.concurrent.ScheduledFuture -import java.util.concurrent.TimeUnit - -class OpenGroupV2Poller(private val openGroups: List, private val executorService: ScheduledExecutorService? = null) { - - private var hasStarted = false - @Volatile private var isPollOngoing = false - var isCaughtUp = false - - private val cancellableFutures = mutableListOf>() - - // use this as a receive time-based window to calculate re-poll interval - private val receivedQueue = ArrayDeque(50) - - private fun calculatePollInterval(): Long { - // sample last default poll time * 2 - while (receivedQueue.size > 50) { - receivedQueue.removeLast() - } - val sampleWindow = System.currentTimeMillis() - pollForNewMessagesInterval * 2 - val numberInSample = receivedQueue.toList().filter { it > sampleWindow }.size.coerceAtLeast(1) - return ((2 + (50 / numberInSample / 20)*5) * 1000).toLong() - } - - // region Settings - companion object { - private val pollForNewMessagesInterval: Long = 10 * 1000 - } - // endregion - - // region Lifecycle - fun startIfNeeded() { - if (hasStarted || executorService == null) return - cancellableFutures += executorService.schedule(::compactPoll, 0, TimeUnit.MILLISECONDS) - hasStarted = true - } - - fun stop() { - cancellableFutures.forEach { future -> - future.cancel(false) - } - cancellableFutures.clear() - hasStarted = false - } - // endregion - - // region Polling - - private fun compactPoll(): Promise { - return compactPoll(false) - } - - fun compactPoll(isBackgroundPoll: Boolean): Promise { - if (isPollOngoing || !hasStarted) return Promise.of(Unit) - isPollOngoing = true - val server = openGroups.first().server // assume all the same server - val rooms = openGroups.map { it.room } - return OpenGroupAPIV2.compactPoll(rooms = rooms, server).successBackground { results -> - results.forEach { (room, results) -> - val serverRoomId = "$server.$room" - handleNewMessages(serverRoomId, results.messages.sortedBy { it.serverID }, isBackgroundPoll) - handleDeletedMessages(serverRoomId,results.deletions) - } - }.always { - isPollOngoing = false - if (!isBackgroundPoll) { - val delay = calculatePollInterval() - executorService?.schedule(this@OpenGroupV2Poller::compactPoll, delay, TimeUnit.MILLISECONDS) - } - } - } - - private fun handleNewMessages(serverRoomId: String, newMessages: List, isBackgroundPoll: Boolean) { - if (!hasStarted) return - newMessages.forEach { message -> - try { - val senderPublicKey = message.sender!! - // Main message - // Envelope - val builder = SignalServiceProtos.Envelope.newBuilder() - builder.type = SignalServiceProtos.Envelope.Type.SESSION_MESSAGE - builder.source = senderPublicKey - builder.sourceDevice = 1 - builder.content = message.toProto().toByteString() - builder.timestamp = message.sentTimestamp - val envelope = builder.build() - val job = MessageReceiveJob(envelope.toByteArray(), message.serverID, serverRoomId) - Log.d("Loki", "Scheduling Job $job") - if (isBackgroundPoll) { - job.executeAsync() - // The promise is just used to keep track of when we're done - } else { - JobQueue.shared.add(job) - } - receivedQueue.addFirst(message.sentTimestamp) - } catch (e: Exception) { - Log.e("Loki", "Exception parsing message", e) - } - } - } - - private fun handleDeletedMessages(serverRoomId: String, deletedMessageServerIDs: List) { - val messagingModule = MessagingModuleConfiguration.shared - val address = GroupUtil.getEncodedOpenGroupID(serverRoomId.toByteArray()) - val threadId = messagingModule.storage.getThreadIdFor(Address.fromSerialized(address)) ?: return - - val deletedMessageIDs = deletedMessageServerIDs.mapNotNull { serverId -> - val id = messagingModule.messageDataProvider.getMessageID(serverId, threadId) - if (id == null) { - Log.d("Loki", "Couldn't find server ID $serverId") - } - id - } - deletedMessageIDs.forEach { (messageId, isSms) -> - MessagingModuleConfiguration.shared.messageDataProvider.deleteMessage(messageId, isSms) - } - } - // endregion -} \ No newline at end of file diff --git a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/Poller.kt b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/Poller.kt index b64b96eb59..bbf2620f97 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/Poller.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/Poller.kt @@ -18,7 +18,7 @@ class Poller { var userPublicKey = MessagingModuleConfiguration.shared.storage.getUserPublicKey() ?: "" private var hasStarted: Boolean = false private val usedSnodes: MutableSet = mutableSetOf() - public var isCaughtUp = false + var isCaughtUp = false // region Settings companion object {