diff --git a/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java b/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java index ef4f5c46a3..a95b6c28c6 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java +++ b/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java @@ -64,7 +64,6 @@ import org.thoughtcrime.securesms.dependencies.DatabaseComponent; import org.thoughtcrime.securesms.dependencies.DatabaseModule; import org.thoughtcrime.securesms.emoji.EmojiSource; import org.thoughtcrime.securesms.groups.OpenGroupManager; -import org.thoughtcrime.securesms.groups.OpenGroupMigrator; import org.thoughtcrime.securesms.home.HomeActivity; import org.thoughtcrime.securesms.jobmanager.JobManager; import org.thoughtcrime.securesms.jobmanager.impl.JsonDataSerializer; @@ -206,9 +205,6 @@ public class ApplicationContext extends Application implements DefaultLifecycleO storage, messageDataProvider, ()-> KeyPairUtilities.INSTANCE.getUserED25519KeyPair(this)); - // migrate session open group data - OpenGroupMigrator.migrate(getDatabaseComponent()); - // end migration callMessageProcessor = new CallMessageProcessor(this, textSecurePreferences, ProcessLifecycleOwner.get().getLifecycle(), storage); Log.i(TAG, "onCreate()"); startKovenant(); diff --git a/app/src/main/java/org/thoughtcrime/securesms/PassphrasePromptActivity.java b/app/src/main/java/org/thoughtcrime/securesms/PassphrasePromptActivity.java index 63b42c4936..afc993df8a 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/PassphrasePromptActivity.java +++ b/app/src/main/java/org/thoughtcrime/securesms/PassphrasePromptActivity.java @@ -210,8 +210,7 @@ public class PassphrasePromptActivity extends BaseActionBarActivity { try { signature = biometricSecretProvider.getOrCreateBiometricSignature(this); hasSignatureObject = true; - throw new InvalidKeyException("e"); - } catch (InvalidKeyException e) { + } catch (Exception e) { signature = null; hasSignatureObject = false; Log.e(TAG, "Error getting / creating signature", e); diff --git a/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/ConversationActivityV2.kt b/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/ConversationActivityV2.kt index 7adf5f7dd3..8ba8f10688 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/ConversationActivityV2.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/ConversationActivityV2.kt @@ -963,6 +963,18 @@ class ConversationActivityV2 : PassphraseRequiredActionBarActivity(), InputBarDe Toast.makeText(this, R.string.copied_to_clipboard, Toast.LENGTH_SHORT).show() } + override fun copyOpenGroupUrl(thread: Recipient) { + if (!thread.isOpenGroupRecipient) { return } + + val threadId = threadDb.getThreadIdIfExistsFor(thread) ?: return + val openGroup = lokiThreadDb.getOpenGroupChat(threadId) ?: return + + val clip = ClipData.newPlainText("Community URL", openGroup.joinURL) + val manager = getSystemService(PassphraseRequiredActionBarActivity.CLIPBOARD_SERVICE) as ClipboardManager + manager.setPrimaryClip(clip) + Toast.makeText(this, R.string.copied_to_clipboard, Toast.LENGTH_SHORT).show() + } + override fun showExpiringMessagesDialog(thread: Recipient) { if (thread.isClosedGroupRecipient) { val group = groupDb.getGroup(thread.address.toGroupString()).orNull() diff --git a/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/menus/ConversationMenuHelper.kt b/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/menus/ConversationMenuHelper.kt index 663dd2e255..8a6c84aecf 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/menus/ConversationMenuHelper.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/menus/ConversationMenuHelper.kt @@ -78,6 +78,10 @@ object ConversationMenuHelper { inflater.inflate(R.menu.menu_conversation_expiration_off, menu) } } + // One-on-one chat menu allows copying the session id + if (thread.isContactRecipient) { + inflater.inflate(R.menu.menu_conversation_copy_session_id, menu) + } // One-on-one chat menu (options that should only be present for one-on-one chats) if (thread.isContactRecipient) { if (thread.isBlocked) { @@ -154,6 +158,7 @@ object ConversationMenuHelper { R.id.menu_block -> { block(context, thread, deleteThread = false) } R.id.menu_block_delete -> { blockAndDelete(context, thread) } R.id.menu_copy_session_id -> { copySessionID(context, thread) } + R.id.menu_copy_open_group_url -> { copyOpenGroupUrl(context, thread) } R.id.menu_edit_group -> { editClosedGroup(context, thread) } R.id.menu_leave_group -> { leaveClosedGroup(context, thread) } R.id.menu_invite_to_open_group -> { inviteContacts(context, thread) } @@ -270,6 +275,12 @@ object ConversationMenuHelper { listener.copySessionID(thread.address.toString()) } + private fun copyOpenGroupUrl(context: Context, thread: Recipient) { + if (!thread.isOpenGroupRecipient) { return } + val listener = context as? ConversationMenuListener ?: return + listener.copyOpenGroupUrl(thread) + } + private fun editClosedGroup(context: Context, thread: Recipient) { if (!thread.isClosedGroupRecipient) { return } val intent = Intent(context, EditClosedGroupActivity::class.java) @@ -344,6 +355,7 @@ object ConversationMenuHelper { fun block(deleteThread: Boolean = false) fun unblock() fun copySessionID(sessionId: String) + fun copyOpenGroupUrl(thread: Recipient) fun showExpiringMessagesDialog(thread: Recipient) } diff --git a/app/src/main/java/org/thoughtcrime/securesms/database/ThreadDatabase.java b/app/src/main/java/org/thoughtcrime/securesms/database/ThreadDatabase.java index 26b20ab00a..976a21595d 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/database/ThreadDatabase.java +++ b/app/src/main/java/org/thoughtcrime/securesms/database/ThreadDatabase.java @@ -57,7 +57,6 @@ import org.thoughtcrime.securesms.database.model.MessageRecord; import org.thoughtcrime.securesms.database.model.MmsMessageRecord; import org.thoughtcrime.securesms.database.model.ThreadRecord; import org.thoughtcrime.securesms.dependencies.DatabaseComponent; -import org.thoughtcrime.securesms.groups.OpenGroupMigrator; import org.thoughtcrime.securesms.mms.Slide; import org.thoughtcrime.securesms.mms.SlideDeck; import org.thoughtcrime.securesms.notifications.MarkReadReceiver; @@ -800,77 +799,6 @@ public class ThreadDatabase extends Database { return query; } - @NotNull - public List getHttpOxenOpenGroups() { - String where = TABLE_NAME+"."+ADDRESS+" LIKE ?"; - String selection = OpenGroupMigrator.HTTP_PREFIX+OpenGroupMigrator.OPEN_GET_SESSION_TRAILING_DOT_ENCODED +"%"; - SQLiteDatabase db = databaseHelper.getReadableDatabase(); - String query = createQuery(where, 0); - Cursor cursor = db.rawQuery(query, new String[]{selection}); - - if (cursor == null) { - return Collections.emptyList(); - } - List threads = new ArrayList<>(); - try { - Reader reader = readerFor(cursor); - ThreadRecord record; - while ((record = reader.getNext()) != null) { - threads.add(record); - } - } finally { - cursor.close(); - } - return threads; - } - - @NotNull - public List getLegacyOxenOpenGroups() { - String where = TABLE_NAME+"."+ADDRESS+" LIKE ?"; - String selection = OpenGroupMigrator.LEGACY_GROUP_ENCODED_ID+"%"; - SQLiteDatabase db = databaseHelper.getReadableDatabase(); - String query = createQuery(where, 0); - Cursor cursor = db.rawQuery(query, new String[]{selection}); - - if (cursor == null) { - return Collections.emptyList(); - } - List threads = new ArrayList<>(); - try { - Reader reader = readerFor(cursor); - ThreadRecord record; - while ((record = reader.getNext()) != null) { - threads.add(record); - } - } finally { - cursor.close(); - } - return threads; - } - - @NotNull - public List getHttpsOxenOpenGroups() { - String where = TABLE_NAME+"."+ADDRESS+" LIKE ?"; - String selection = OpenGroupMigrator.NEW_GROUP_ENCODED_ID+"%"; - SQLiteDatabase db = databaseHelper.getReadableDatabase(); - String query = createQuery(where, 0); - Cursor cursor = db.rawQuery(query, new String[]{selection}); - if (cursor == null) { - return Collections.emptyList(); - } - List threads = new ArrayList<>(); - try { - Reader reader = readerFor(cursor); - ThreadRecord record; - while ((record = reader.getNext()) != null) { - threads.add(record); - } - } finally { - cursor.close(); - } - return threads; - } - public void migrateEncodedGroup(long threadId, @NotNull String newEncodedGroupId) { ContentValues contentValues = new ContentValues(1); contentValues.put(ADDRESS, newEncodedGroupId); diff --git a/app/src/main/java/org/thoughtcrime/securesms/database/helpers/SQLCipherOpenHelper.java b/app/src/main/java/org/thoughtcrime/securesms/database/helpers/SQLCipherOpenHelper.java index 4a48aa2446..df69c239fc 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/database/helpers/SQLCipherOpenHelper.java +++ b/app/src/main/java/org/thoughtcrime/securesms/database/helpers/SQLCipherOpenHelper.java @@ -97,25 +97,40 @@ public class SQLCipherOpenHelper extends SQLiteOpenHelper { private final DatabaseSecret databaseSecret; public SQLCipherOpenHelper(@NonNull Context context, @NonNull DatabaseSecret databaseSecret) { - super(context, DATABASE_NAME, databaseSecret.asString(), null, DATABASE_VERSION, MIN_DATABASE_VERSION, null, new SQLiteDatabaseHook() { - @Override - public void preKey(SQLiteConnection connection) { - SQLCipherOpenHelper.applySQLCipherPragmas(connection, true); - } - - @Override - public void postKey(SQLiteConnection connection) { - SQLCipherOpenHelper.applySQLCipherPragmas(connection, true); - - // if not vacuumed in a while, perform that operation - long currentTime = System.currentTimeMillis(); - // 7 days - if (currentTime - TextSecurePreferences.getLastVacuumTime(context) > 604_800_000) { - connection.execute("VACUUM;", null, null); - TextSecurePreferences.setLastVacuumNow(context); + super( + context, + DATABASE_NAME, + databaseSecret.asString(), + null, + DATABASE_VERSION, + MIN_DATABASE_VERSION, + null, + new SQLiteDatabaseHook() { + @Override + public void preKey(SQLiteConnection connection) { + SQLCipherOpenHelper.applySQLCipherPragmas(connection, true); } - } - }, true); + + @Override + public void postKey(SQLiteConnection connection) { + SQLCipherOpenHelper.applySQLCipherPragmas(connection, true); + + // if not vacuumed in a while, perform that operation + long currentTime = System.currentTimeMillis(); + // 7 days + if (currentTime - TextSecurePreferences.getLastVacuumTime(context) > 604_800_000) { + connection.execute("VACUUM;", null, null); + TextSecurePreferences.setLastVacuumNow(context); + } + } + }, + // Note: Now that we support concurrent database reads the migrations are actually non-blocking + // because of this we need to initially open the database with writeAheadLogging (WAL mode) disabled + // and enable it once the database officially opens it's connection (which will cause it to re-connect + // in WAL mode) - this is a little inefficient but will prevent SQL-related errors/crashes due to + // incomplete migrations + false + ); this.context = context.getApplicationContext(); this.databaseSecret = databaseSecret; @@ -150,11 +165,11 @@ public class SQLCipherOpenHelper extends SQLiteOpenHelper { // If the old SQLCipher3 database file doesn't exist then no need to do anything if (!oldDbFile.exists()) { return; } - try { - // Define the location for the new database - String newDbPath = context.getDatabasePath(DATABASE_NAME).getPath(); - File newDbFile = new File(newDbPath); + // Define the location for the new database + String newDbPath = context.getDatabasePath(DATABASE_NAME).getPath(); + File newDbFile = new File(newDbPath); + try { // If the new database file already exists then check if it's valid first, if it's in an // invalid state we should delete it and try to migrate again if (newDbFile.exists()) { @@ -162,10 +177,24 @@ public class SQLCipherOpenHelper extends SQLiteOpenHelper { // assume the user hasn't downgraded for some reason and made changes to the old database and // can remove the old database file (it won't be used anymore) if (oldDbFile.lastModified() <= newDbFile.lastModified()) { - // TODO: Delete 'CIPHER3_DATABASE_NAME' once enough time has past -// //noinspection ResultOfMethodCallIgnored -// oldDbFile.delete(); - return; + try { + SQLiteDatabase newDb = SQLCipherOpenHelper.open(newDbPath, databaseSecret, true); + int version = newDb.getVersion(); + newDb.close(); + + // Make sure the new database has it's version set correctly (if not then the migration didn't + // fully succeed and the database will try to create all it's tables and immediately fail so + // we will need to remove and remigrate) + if (version > 0) { + // TODO: Delete 'CIPHER3_DATABASE_NAME' once enough time has past +// //noinspection ResultOfMethodCallIgnored +// oldDbFile.delete(); + return; + } + } + catch (Exception e) { + Log.i(TAG, "Failed to retrieve version from new database, assuming invalid and remigrating"); + } } // If the old database does have newer changes then the new database could have stale/invalid @@ -207,6 +236,11 @@ public class SQLCipherOpenHelper extends SQLiteOpenHelper { catch (Exception e) { Log.e(TAG, "Migration from SQLCipher3 to SQLCipher4 failed", e); + // If an exception was thrown then we should remove the new database file (it's probably invalid) + if (!newDbFile.delete()) { + Log.e(TAG, "Unable to delete invalid new database file"); + } + // Notify the user of the issue so they know they can downgrade until the issue is fixed NotificationManager notificationManager = context.getSystemService(NotificationManager.class); String channelId = context.getString(R.string.NotificationChannel_failures); @@ -559,6 +593,15 @@ public class SQLCipherOpenHelper extends SQLiteOpenHelper { } } + @Override + public void onOpen(SQLiteDatabase db) { + super.onOpen(db); + + // Now that the database is officially open (ie. the migrations are completed) we want to enable + // write ahead logging (WAL mode) to officially support concurrent read connections + db.enableWriteAheadLogging(); + } + public void markCurrent(SQLiteDatabase db) { db.setVersion(DATABASE_VERSION); } diff --git a/app/src/main/java/org/thoughtcrime/securesms/groups/OpenGroupMigrator.kt b/app/src/main/java/org/thoughtcrime/securesms/groups/OpenGroupMigrator.kt deleted file mode 100644 index 642d191614..0000000000 --- a/app/src/main/java/org/thoughtcrime/securesms/groups/OpenGroupMigrator.kt +++ /dev/null @@ -1,139 +0,0 @@ -package org.thoughtcrime.securesms.groups - -import androidx.annotation.VisibleForTesting -import org.session.libsession.messaging.open_groups.OpenGroupApi -import org.session.libsession.utilities.recipients.Recipient -import org.session.libsignal.utilities.Hex -import org.thoughtcrime.securesms.database.model.ThreadRecord -import org.thoughtcrime.securesms.dependencies.DatabaseComponent - -object OpenGroupMigrator { - const val HTTP_PREFIX = "__loki_public_chat_group__!687474703a2f2f" - private const val HTTPS_PREFIX = "__loki_public_chat_group__!68747470733a2f2f" - const val OPEN_GET_SESSION_TRAILING_DOT_ENCODED = "6f70656e2e67657473657373696f6e2e6f72672e" - const val LEGACY_GROUP_ENCODED_ID = "__loki_public_chat_group__!687474703a2f2f3131362e3230332e37302e33332e" // old IP based toByteArray() - const val NEW_GROUP_ENCODED_ID = "__loki_public_chat_group__!68747470733a2f2f6f70656e2e67657473657373696f6e2e6f72672e" // new URL based toByteArray() - - data class OpenGroupMapping(val stub: String, val legacyThreadId: Long, val newThreadId: Long?) - - @VisibleForTesting - fun Recipient.roomStub(): String? { - if (!isOpenGroupRecipient) return null - val serialized = address.serialize() - if (serialized.startsWith(LEGACY_GROUP_ENCODED_ID)) { - return serialized.replace(LEGACY_GROUP_ENCODED_ID,"") - } else if (serialized.startsWith(NEW_GROUP_ENCODED_ID)) { - return serialized.replace(NEW_GROUP_ENCODED_ID,"") - } else if (serialized.startsWith(HTTP_PREFIX + OPEN_GET_SESSION_TRAILING_DOT_ENCODED)) { - return serialized.replace(HTTP_PREFIX + OPEN_GET_SESSION_TRAILING_DOT_ENCODED, "") - } - return null - } - - @VisibleForTesting - fun getExistingMappings(legacy: List, new: List): List { - val legacyStubsMapping = legacy.mapNotNull { thread -> - val stub = thread.recipient.roomStub() - stub?.let { it to thread.threadId } - } - val newStubsMapping = new.mapNotNull { thread -> - val stub = thread.recipient.roomStub() - stub?.let { it to thread.threadId } - } - return legacyStubsMapping.map { (legacyEncodedStub, legacyId) -> - // get 'new' open group thread ID if stubs match - OpenGroupMapping( - legacyEncodedStub, - legacyId, - newStubsMapping.firstOrNull { (newEncodedStub, _) -> newEncodedStub == legacyEncodedStub }?.second - ) - } - } - - @JvmStatic - fun migrate(databaseComponent: DatabaseComponent) { - // migrate thread db - val threadDb = databaseComponent.threadDatabase() - - val legacyOpenGroups = threadDb.legacyOxenOpenGroups - val httpBasedNewGroups = threadDb.httpOxenOpenGroups - if (legacyOpenGroups.isEmpty() && httpBasedNewGroups.isEmpty()) return // no need to migrate - - val newOpenGroups = threadDb.httpsOxenOpenGroups - val firstStepMigration = getExistingMappings(legacyOpenGroups, newOpenGroups) - - val secondStepMigration = getExistingMappings(httpBasedNewGroups, newOpenGroups) - - val groupDb = databaseComponent.groupDatabase() - val lokiApiDb = databaseComponent.lokiAPIDatabase() - val smsDb = databaseComponent.smsDatabase() - val mmsDb = databaseComponent.mmsDatabase() - val lokiMessageDatabase = databaseComponent.lokiMessageDatabase() - val lokiThreadDatabase = databaseComponent.lokiThreadDatabase() - - firstStepMigration.forEach { (stub, old, new) -> - val legacyEncodedGroupId = LEGACY_GROUP_ENCODED_ID+stub - if (new == null) { - val newEncodedGroupId = NEW_GROUP_ENCODED_ID+stub - // migrate thread and group encoded values - threadDb.migrateEncodedGroup(old, newEncodedGroupId) - groupDb.migrateEncodedGroup(legacyEncodedGroupId, newEncodedGroupId) - // migrate Loki API DB values - // decode the hex to bytes, decode byte array to string i.e. "oxen" or "session" - val decodedStub = Hex.fromStringCondensed(stub).decodeToString() - val legacyLokiServerId = "${OpenGroupApi.legacyDefaultServer}.$decodedStub" - val newLokiServerId = "${OpenGroupApi.defaultServer}.$decodedStub" - lokiApiDb.migrateLegacyOpenGroup(legacyLokiServerId, newLokiServerId) - // migrate loki thread db server info - val oldServerInfo = lokiThreadDatabase.getOpenGroupChat(old) - val newServerInfo = oldServerInfo!!.copy(server = OpenGroupApi.defaultServer, id = newLokiServerId) - lokiThreadDatabase.setOpenGroupChat(newServerInfo, old) - } else { - // has a legacy and a new one - // migrate SMS and MMS tables - smsDb.migrateThreadId(old, new) - mmsDb.migrateThreadId(old, new) - lokiMessageDatabase.migrateThreadId(old, new) - // delete group for legacy ID - groupDb.delete(legacyEncodedGroupId) - // delete thread for legacy ID - threadDb.deleteConversation(old) - lokiThreadDatabase.removeOpenGroupChat(old) - } - // maybe migrate jobs here - } - - secondStepMigration.forEach { (stub, old, new) -> - val legacyEncodedGroupId = HTTP_PREFIX + OPEN_GET_SESSION_TRAILING_DOT_ENCODED + stub - if (new == null) { - val newEncodedGroupId = NEW_GROUP_ENCODED_ID+stub - // migrate thread and group encoded values - threadDb.migrateEncodedGroup(old, newEncodedGroupId) - groupDb.migrateEncodedGroup(legacyEncodedGroupId, newEncodedGroupId) - // migrate Loki API DB values - // decode the hex to bytes, decode byte array to string i.e. "oxen" or "session" - val decodedStub = Hex.fromStringCondensed(stub).decodeToString() - val legacyLokiServerId = "${OpenGroupApi.httpDefaultServer}.$decodedStub" - val newLokiServerId = "${OpenGroupApi.defaultServer}.$decodedStub" - lokiApiDb.migrateLegacyOpenGroup(legacyLokiServerId, newLokiServerId) - // migrate loki thread db server info - val oldServerInfo = lokiThreadDatabase.getOpenGroupChat(old) - val newServerInfo = oldServerInfo!!.copy(server = OpenGroupApi.defaultServer, id = newLokiServerId) - lokiThreadDatabase.setOpenGroupChat(newServerInfo, old) - } else { - // has a legacy and a new one - // migrate SMS and MMS tables - smsDb.migrateThreadId(old, new) - mmsDb.migrateThreadId(old, new) - lokiMessageDatabase.migrateThreadId(old, new) - // delete group for legacy ID - groupDb.delete(legacyEncodedGroupId) - // delete thread for legacy ID - threadDb.deleteConversation(old) - lokiThreadDatabase.removeOpenGroupChat(old) - } - // maybe migrate jobs here - } - - } -} \ No newline at end of file diff --git a/app/src/main/java/org/thoughtcrime/securesms/notifications/BackgroundPollWorker.kt b/app/src/main/java/org/thoughtcrime/securesms/notifications/BackgroundPollWorker.kt index 5a0438e15d..9b8ea5824d 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/notifications/BackgroundPollWorker.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/notifications/BackgroundPollWorker.kt @@ -60,7 +60,7 @@ class BackgroundPollWorker(val context: Context, params: WorkerParameters) : Wor // FIXME: Using a job here seems like a bad idea... MessageReceiveParameters(envelope.toByteArray(), serverHash, null) } - BatchMessageReceiveJob(params).executeAsync() + BatchMessageReceiveJob(params).executeAsync("background") } promises.add(dmsPromise) diff --git a/app/src/main/res/menu/menu_conversation_open_group.xml b/app/src/main/res/menu/menu_conversation_open_group.xml index 6ff025aadb..1bbb2d76de 100644 --- a/app/src/main/res/menu/menu_conversation_open_group.xml +++ b/app/src/main/res/menu/menu_conversation_open_group.xml @@ -2,6 +2,10 @@ + + diff --git a/app/src/main/res/values/strings.xml b/app/src/main/res/values/strings.xml index d0ee94cf1d..7c39714c09 100644 --- a/app/src/main/res/values/strings.xml +++ b/app/src/main/res/values/strings.xml @@ -77,6 +77,7 @@ Attachment exceeds size limits for the type of message you\'re sending. Unable to record audio! There is no app available to handle this link on your device. + Copy Community URL Add members Session needs microphone access to send audio messages. Session needs microphone access to send audio messages, but it has been permanently denied. Please continue to app settings, select \"Permissions\", and enable \"Microphone\". diff --git a/app/src/test/java/org/thoughtcrime/securesms/util/OpenGroupMigrationTests.kt b/app/src/test/java/org/thoughtcrime/securesms/util/OpenGroupMigrationTests.kt deleted file mode 100644 index dcf8ca231b..0000000000 --- a/app/src/test/java/org/thoughtcrime/securesms/util/OpenGroupMigrationTests.kt +++ /dev/null @@ -1,281 +0,0 @@ -package org.thoughtcrime.securesms.util - -import org.junit.Assert.assertEquals -import org.junit.Assert.assertTrue -import org.junit.Test -import org.mockito.kotlin.KStubbing -import org.mockito.kotlin.argumentCaptor -import org.mockito.kotlin.doAnswer -import org.mockito.kotlin.doReturn -import org.mockito.kotlin.eq -import org.mockito.kotlin.mock -import org.mockito.kotlin.verify -import org.mockito.kotlin.verifyNoMoreInteractions -import org.session.libsession.messaging.open_groups.OpenGroup -import org.session.libsession.messaging.open_groups.OpenGroupApi -import org.session.libsession.utilities.Address -import org.session.libsession.utilities.recipients.Recipient -import org.thoughtcrime.securesms.database.GroupDatabase -import org.thoughtcrime.securesms.database.LokiAPIDatabase -import org.thoughtcrime.securesms.database.LokiMessageDatabase -import org.thoughtcrime.securesms.database.LokiThreadDatabase -import org.thoughtcrime.securesms.database.MmsDatabase -import org.thoughtcrime.securesms.database.SmsDatabase -import org.thoughtcrime.securesms.database.ThreadDatabase -import org.thoughtcrime.securesms.database.model.ThreadRecord -import org.thoughtcrime.securesms.dependencies.DatabaseComponent -import org.thoughtcrime.securesms.groups.OpenGroupMigrator -import org.thoughtcrime.securesms.groups.OpenGroupMigrator.OpenGroupMapping -import org.thoughtcrime.securesms.groups.OpenGroupMigrator.roomStub - -class OpenGroupMigrationTests { - - companion object { - const val EXAMPLE_LEGACY_ENCODED_OPEN_GROUP = "__loki_public_chat_group__!687474703a2f2f3131362e3230332e37302e33332e6f78656e" - const val EXAMPLE_NEW_ENCODED_OPEN_GROUP = "__loki_public_chat_group__!68747470733a2f2f6f70656e2e67657473657373696f6e2e6f72672e6f78656e" - const val OXEN_STUB_HEX = "6f78656e" - - const val EXAMPLE_LEGACY_SERVER_ID = "http://116.203.70.33.oxen" - const val EXAMPLE_NEW_SERVER_ID = "https://open.getsession.org.oxen" - - const val LEGACY_THREAD_ID = 1L - const val NEW_THREAD_ID = 2L - } - - private fun legacyOpenGroupRecipient(additionalMocks: ((KStubbing) -> Unit) ? = null) = mock { - on { address } doReturn Address.fromSerialized(EXAMPLE_LEGACY_ENCODED_OPEN_GROUP) - on { isOpenGroupRecipient } doReturn true - additionalMocks?.let { it(this) } - } - - private fun newOpenGroupRecipient(additionalMocks: ((KStubbing) -> Unit) ? = null) = mock { - on { address } doReturn Address.fromSerialized(EXAMPLE_NEW_ENCODED_OPEN_GROUP) - on { isOpenGroupRecipient } doReturn true - additionalMocks?.let { it(this) } - } - - private fun legacyThreadRecord(additionalRecipientMocks: ((KStubbing) -> Unit) ? = null, additionalThreadMocks: ((KStubbing) -> Unit)? = null) = mock { - val returnedRecipient = legacyOpenGroupRecipient(additionalRecipientMocks) - on { recipient } doReturn returnedRecipient - on { threadId } doReturn LEGACY_THREAD_ID - } - - private fun newThreadRecord(additionalRecipientMocks: ((KStubbing) -> Unit)? = null, additionalThreadMocks: ((KStubbing) -> Unit)? = null) = mock { - val returnedRecipient = newOpenGroupRecipient(additionalRecipientMocks) - on { recipient } doReturn returnedRecipient - on { threadId } doReturn NEW_THREAD_ID - } - - @Test - fun `it should generate the correct room stubs for legacy groups`() { - val mockRecipient = legacyOpenGroupRecipient() - assertEquals(OXEN_STUB_HEX, mockRecipient.roomStub()) - } - - @Test - fun `it should generate the correct room stubs for new groups`() { - val mockNewRecipient = newOpenGroupRecipient() - assertEquals(OXEN_STUB_HEX, mockNewRecipient.roomStub()) - } - - @Test - fun `it should return correct mappings`() { - val legacyThread = legacyThreadRecord() - val newThread = newThreadRecord() - - val expectedMapping = listOf( - OpenGroupMapping(OXEN_STUB_HEX, LEGACY_THREAD_ID, NEW_THREAD_ID) - ) - - assertTrue(expectedMapping.containsAll(OpenGroupMigrator.getExistingMappings(listOf(legacyThread), listOf(newThread)))) - } - - @Test - fun `it should return no mappings if there are no legacy open groups`() { - val mappings = OpenGroupMigrator.getExistingMappings(listOf(), listOf()) - assertTrue(mappings.isEmpty()) - } - - @Test - fun `it should return no mappings if there are only new open groups`() { - val newThread = newThreadRecord() - val mappings = OpenGroupMigrator.getExistingMappings(emptyList(), listOf(newThread)) - assertTrue(mappings.isEmpty()) - } - - @Test - fun `it should return null new thread in mappings if there are only legacy open groups`() { - val legacyThread = legacyThreadRecord() - val mappings = OpenGroupMigrator.getExistingMappings(listOf(legacyThread), emptyList()) - val expectedMappings = listOf( - OpenGroupMapping(OXEN_STUB_HEX, LEGACY_THREAD_ID, null) - ) - assertTrue(expectedMappings.containsAll(mappings)) - } - - @Test - fun `test migration thread DB calls legacy and returns if no legacy official groups`() { - val mockedThreadDb = mock { - on { legacyOxenOpenGroups } doReturn emptyList() - } - val mockedDbComponent = mock { - on { threadDatabase() } doReturn mockedThreadDb - } - - OpenGroupMigrator.migrate(mockedDbComponent) - - verify(mockedDbComponent).threadDatabase() - verify(mockedThreadDb).legacyOxenOpenGroups - verifyNoMoreInteractions(mockedThreadDb) - } - - @Test - fun `it should migrate on thread, group and loki dbs with correct values for legacy only migration`() { - // mock threadDB - val capturedThreadId = argumentCaptor() - val capturedNewEncoded = argumentCaptor() - val mockedThreadDb = mock { - val legacyThreadRecord = legacyThreadRecord() - on { legacyOxenOpenGroups } doReturn listOf(legacyThreadRecord) - on { httpsOxenOpenGroups } doReturn emptyList() - on { migrateEncodedGroup(capturedThreadId.capture(), capturedNewEncoded.capture()) } doAnswer {} - } - - // mock groupDB - val capturedGroupLegacyEncoded = argumentCaptor() - val capturedGroupNewEncoded = argumentCaptor() - val mockedGroupDb = mock { - on { - migrateEncodedGroup( - capturedGroupLegacyEncoded.capture(), - capturedGroupNewEncoded.capture() - ) - } doAnswer {} - } - - // mock LokiAPIDB - val capturedLokiLegacyGroup = argumentCaptor() - val capturedLokiNewGroup = argumentCaptor() - val mockedLokiApi = mock { - on { migrateLegacyOpenGroup(capturedLokiLegacyGroup.capture(), capturedLokiNewGroup.capture()) } doAnswer {} - } - - val pubKey = OpenGroupApi.defaultServerPublicKey - val room = "oxen" - val legacyServer = OpenGroupApi.legacyDefaultServer - val newServer = OpenGroupApi.defaultServer - - val lokiThreadOpenGroup = argumentCaptor() - val mockedLokiThreadDb = mock { - on { getOpenGroupChat(eq(LEGACY_THREAD_ID)) } doReturn OpenGroup(legacyServer, room, "Oxen", 0, pubKey) - on { setOpenGroupChat(lokiThreadOpenGroup.capture(), eq(LEGACY_THREAD_ID)) } doAnswer {} - } - - val mockedDbComponent = mock { - on { threadDatabase() } doReturn mockedThreadDb - on { groupDatabase() } doReturn mockedGroupDb - on { lokiAPIDatabase() } doReturn mockedLokiApi - on { lokiThreadDatabase() } doReturn mockedLokiThreadDb - } - - OpenGroupMigrator.migrate(mockedDbComponent) - - // expect threadDB migration to reflect new thread values: - // thread ID = 1, encoded ID = new encoded ID - assertEquals(LEGACY_THREAD_ID, capturedThreadId.firstValue) - assertEquals(EXAMPLE_NEW_ENCODED_OPEN_GROUP, capturedNewEncoded.firstValue) - - // expect groupDB migration to reflect new thread values: - // legacy encoded ID, new encoded ID - assertEquals(EXAMPLE_LEGACY_ENCODED_OPEN_GROUP, capturedGroupLegacyEncoded.firstValue) - assertEquals(EXAMPLE_NEW_ENCODED_OPEN_GROUP, capturedGroupNewEncoded.firstValue) - - // expect Loki API DB migration to reflect new thread values: - assertEquals("${OpenGroupApi.legacyDefaultServer}.oxen", capturedLokiLegacyGroup.firstValue) - assertEquals("${OpenGroupApi.defaultServer}.oxen", capturedLokiNewGroup.firstValue) - - assertEquals(newServer, lokiThreadOpenGroup.firstValue.server) - - } - - @Test - fun `it should migrate and delete legacy thread with conflicting new and old values`() { - - // mock threadDB - val capturedThreadId = argumentCaptor() - val mockedThreadDb = mock { - val legacyThreadRecord = legacyThreadRecord() - val newThreadRecord = newThreadRecord() - on { legacyOxenOpenGroups } doReturn listOf(legacyThreadRecord) - on { httpsOxenOpenGroups } doReturn listOf(newThreadRecord) - on { deleteConversation(capturedThreadId.capture()) } doAnswer {} - } - - // mock groupDB - val capturedGroupLegacyEncoded = argumentCaptor() - val mockedGroupDb = mock { - on { delete(capturedGroupLegacyEncoded.capture()) } doReturn true - } - - // mock LokiAPIDB - val capturedLokiLegacyGroup = argumentCaptor() - val capturedLokiNewGroup = argumentCaptor() - val mockedLokiApi = mock { - on { migrateLegacyOpenGroup(capturedLokiLegacyGroup.capture(), capturedLokiNewGroup.capture()) } doAnswer {} - } - - // mock messaging dbs - val migrateMmsFromThreadId = argumentCaptor() - val migrateMmsToThreadId = argumentCaptor() - - val mockedMmsDb = mock { - on { migrateThreadId(migrateMmsFromThreadId.capture(), migrateMmsToThreadId.capture()) } doAnswer {} - } - - val migrateSmsFromThreadId = argumentCaptor() - val migrateSmsToThreadId = argumentCaptor() - val mockedSmsDb = mock { - on { migrateThreadId(migrateSmsFromThreadId.capture(), migrateSmsToThreadId.capture()) } doAnswer {} - } - - val lokiFromThreadId = argumentCaptor() - val lokiToThreadId = argumentCaptor() - val mockedLokiMessageDatabase = mock { - on { migrateThreadId(lokiFromThreadId.capture(), lokiToThreadId.capture()) } doAnswer {} - } - - val mockedLokiThreadDb = mock { - on { removeOpenGroupChat(eq(LEGACY_THREAD_ID)) } doAnswer {} - } - - val mockedDbComponent = mock { - on { threadDatabase() } doReturn mockedThreadDb - on { groupDatabase() } doReturn mockedGroupDb - on { lokiAPIDatabase() } doReturn mockedLokiApi - on { mmsDatabase() } doReturn mockedMmsDb - on { smsDatabase() } doReturn mockedSmsDb - on { lokiMessageDatabase() } doReturn mockedLokiMessageDatabase - on { lokiThreadDatabase() } doReturn mockedLokiThreadDb - } - - OpenGroupMigrator.migrate(mockedDbComponent) - - // should delete thread by thread ID - assertEquals(LEGACY_THREAD_ID, capturedThreadId.firstValue) - - // should delete group by legacy encoded ID - assertEquals(EXAMPLE_LEGACY_ENCODED_OPEN_GROUP, capturedGroupLegacyEncoded.firstValue) - - // should migrate SMS from legacy thread ID to new thread ID - assertEquals(LEGACY_THREAD_ID, migrateSmsFromThreadId.firstValue) - assertEquals(NEW_THREAD_ID, migrateSmsToThreadId.firstValue) - - // should migrate MMS from legacy thread ID to new thread ID - assertEquals(LEGACY_THREAD_ID, migrateMmsFromThreadId.firstValue) - assertEquals(NEW_THREAD_ID, migrateMmsToThreadId.firstValue) - - } - - - -} \ No newline at end of file diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/AttachmentDownloadJob.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/AttachmentDownloadJob.kt index 826df3ef8d..ef1d7567b3 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/AttachmentDownloadJob.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/AttachmentDownloadJob.kt @@ -42,7 +42,7 @@ class AttachmentDownloadJob(val attachmentID: Long, val databaseMessageID: Long) private val TS_INCOMING_MESSAGE_ID_KEY = "tsIncoming_message_id" } - override fun execute() { + override fun execute(dispatcherName: String) { val storage = MessagingModuleConfiguration.shared.storage val messageDataProvider = MessagingModuleConfiguration.shared.messageDataProvider val threadID = storage.getThreadIdForMms(databaseMessageID) @@ -59,7 +59,7 @@ class AttachmentDownloadJob(val attachmentID: Long, val databaseMessageID: Long) Log.d("AttachmentDownloadJob", "Setting attachment state = failed, don't have attachment") messageDataProvider.setAttachmentState(AttachmentState.FAILED, AttachmentId(attachmentID,0), databaseMessageID) } - this.handlePermanentFailure(exception) + this.handlePermanentFailure(dispatcherName, exception) } else if (exception == Error.DuplicateData) { attachment?.let { id -> Log.d("AttachmentDownloadJob", "Setting attachment state = done from duplicate data") @@ -68,7 +68,7 @@ class AttachmentDownloadJob(val attachmentID: Long, val databaseMessageID: Long) Log.d("AttachmentDownloadJob", "Setting attachment state = done from duplicate data") messageDataProvider.setAttachmentState(AttachmentState.DONE, AttachmentId(attachmentID,0), databaseMessageID) } - this.handleSuccess() + this.handleSuccess(dispatcherName) } else { if (failureCount + 1 >= maxFailureCount) { attachment?.let { id -> @@ -79,7 +79,7 @@ class AttachmentDownloadJob(val attachmentID: Long, val databaseMessageID: Long) messageDataProvider.setAttachmentState(AttachmentState.FAILED, AttachmentId(attachmentID,0), databaseMessageID) } } - this.handleFailure(exception) + this.handleFailure(dispatcherName, exception) } } @@ -150,7 +150,7 @@ class AttachmentDownloadJob(val attachmentID: Long, val databaseMessageID: Long) Log.d("AttachmentDownloadJob", "deleting tempfile") tempFile.delete() Log.d("AttachmentDownloadJob", "succeeding job") - handleSuccess() + handleSuccess(dispatcherName) } catch (e: Exception) { Log.e("AttachmentDownloadJob", "Error processing attachment download", e) tempFile?.delete() @@ -169,17 +169,17 @@ class AttachmentDownloadJob(val attachmentID: Long, val databaseMessageID: Long) } } - private fun handleSuccess() { + private fun handleSuccess(dispatcherName: String) { Log.w("AttachmentDownloadJob", "Attachment downloaded successfully.") - delegate?.handleJobSucceeded(this) + delegate?.handleJobSucceeded(this, dispatcherName) } - private fun handlePermanentFailure(e: Exception) { - delegate?.handleJobFailedPermanently(this, e) + private fun handlePermanentFailure(dispatcherName: String, e: Exception) { + delegate?.handleJobFailedPermanently(this, dispatcherName, e) } - private fun handleFailure(e: Exception) { - delegate?.handleJobFailed(this, e) + private fun handleFailure(dispatcherName: String, e: Exception) { + delegate?.handleJobFailed(this, dispatcherName, e) } private fun createTempFile(): File { diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/AttachmentUploadJob.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/AttachmentUploadJob.kt index 360207af43..cd4189a653 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/AttachmentUploadJob.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/AttachmentUploadJob.kt @@ -45,29 +45,29 @@ class AttachmentUploadJob(val attachmentID: Long, val threadID: String, val mess private val MESSAGE_SEND_JOB_ID_KEY = "message_send_job_id" } - override fun execute() { + override fun execute(dispatcherName: String) { try { val storage = MessagingModuleConfiguration.shared.storage val messageDataProvider = MessagingModuleConfiguration.shared.messageDataProvider val attachment = messageDataProvider.getScaledSignalAttachmentStream(attachmentID) - ?: return handleFailure(Error.NoAttachment) + ?: return handleFailure(dispatcherName, Error.NoAttachment) val openGroup = storage.getOpenGroup(threadID.toLong()) if (openGroup != null) { val keyAndResult = upload(attachment, openGroup.server, false) { OpenGroupApi.upload(it, openGroup.room, openGroup.server) } - handleSuccess(attachment, keyAndResult.first, keyAndResult.second) + handleSuccess(dispatcherName, attachment, keyAndResult.first, keyAndResult.second) } else { val keyAndResult = upload(attachment, FileServerApi.server, true) { FileServerApi.upload(it) } - handleSuccess(attachment, keyAndResult.first, keyAndResult.second) + handleSuccess(dispatcherName, attachment, keyAndResult.first, keyAndResult.second) } } catch (e: java.lang.Exception) { if (e == Error.NoAttachment) { - this.handlePermanentFailure(e) + this.handlePermanentFailure(dispatcherName, e) } else { - this.handleFailure(e) + this.handleFailure(dispatcherName, e) } } } @@ -104,9 +104,9 @@ class AttachmentUploadJob(val attachmentID: Long, val threadID: String, val mess return Pair(key, UploadResult(id, "${server}/file/$id", digest)) } - private fun handleSuccess(attachment: SignalServiceAttachmentStream, attachmentKey: ByteArray, uploadResult: UploadResult) { + private fun handleSuccess(dispatcherName: String, attachment: SignalServiceAttachmentStream, attachmentKey: ByteArray, uploadResult: UploadResult) { Log.d(TAG, "Attachment uploaded successfully.") - delegate?.handleJobSucceeded(this) + delegate?.handleJobSucceeded(this, dispatcherName) val messageDataProvider = MessagingModuleConfiguration.shared.messageDataProvider messageDataProvider.handleSuccessfulAttachmentUpload(attachmentID, attachment, attachmentKey, uploadResult) if (attachment.contentType.startsWith("audio/")) { @@ -144,16 +144,16 @@ class AttachmentUploadJob(val attachmentID: Long, val threadID: String, val mess storage.resumeMessageSendJobIfNeeded(messageSendJobID) } - private fun handlePermanentFailure(e: Exception) { + private fun handlePermanentFailure(dispatcherName: String, e: Exception) { Log.w(TAG, "Attachment upload failed permanently due to error: $this.") - delegate?.handleJobFailedPermanently(this, e) + delegate?.handleJobFailedPermanently(this, dispatcherName, e) MessagingModuleConfiguration.shared.messageDataProvider.handleFailedAttachmentUpload(attachmentID) failAssociatedMessageSendJob(e) } - private fun handleFailure(e: Exception) { + private fun handleFailure(dispatcherName: String, e: Exception) { Log.w(TAG, "Attachment upload failed due to error: $this.") - delegate?.handleJobFailed(this, e) + delegate?.handleJobFailed(this, dispatcherName, e) if (failureCount + 1 >= maxFailureCount) { failAssociatedMessageSendJob(e) } diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/BackgroundGroupAddJob.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/BackgroundGroupAddJob.kt index c679724b9f..ef67408fb3 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/BackgroundGroupAddJob.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/BackgroundGroupAddJob.kt @@ -29,14 +29,14 @@ class BackgroundGroupAddJob(val joinUrl: String): Job { return "$server.$room" } - override fun execute() { + override fun execute(dispatcherName: String) { try { val openGroup = OpenGroupUrlParser.parseUrl(joinUrl) val storage = MessagingModuleConfiguration.shared.storage val allOpenGroups = storage.getAllOpenGroups().map { it.value.joinURL } if (allOpenGroups.contains(openGroup.joinUrl())) { Log.e("OpenGroupDispatcher", "Failed to add group because", DuplicateGroupException()) - delegate?.handleJobFailed(this, DuplicateGroupException()) + delegate?.handleJobFailed(this, dispatcherName, DuplicateGroupException()) return } // get image @@ -50,11 +50,11 @@ class BackgroundGroupAddJob(val joinUrl: String): Job { storage.onOpenGroupAdded(openGroup.server) } catch (e: Exception) { Log.e("OpenGroupDispatcher", "Failed to add group because",e) - delegate?.handleJobFailed(this, e) + delegate?.handleJobFailed(this, dispatcherName, e) return } Log.d("Loki", "Group added successfully") - delegate?.handleJobSucceeded(this) + delegate?.handleJobSucceeded(this, dispatcherName) } override fun serialize(): Data = Data.Builder() diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/BatchMessageReceiveJob.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/BatchMessageReceiveJob.kt index 18a8cc4aed..54a6551dc4 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/BatchMessageReceiveJob.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/BatchMessageReceiveJob.kt @@ -66,11 +66,11 @@ class BatchMessageReceiveJob( return storage.getOrCreateThreadIdFor(senderOrSync, message.groupPublicKey, openGroupID) } - override fun execute() { - executeAsync().get() + override fun execute(dispatcherName: String) { + executeAsync(dispatcherName).get() } - fun executeAsync(): Promise { + fun executeAsync(dispatcherName: String): Promise { return task { val threadMap = mutableMapOf>() val storage = MessagingModuleConfiguration.shared.storage @@ -188,19 +188,21 @@ class BatchMessageReceiveJob( deferredThreadMap.awaitAll() } if (failures.isEmpty()) { - handleSuccess() + handleSuccess(dispatcherName) } else { - handleFailure() + handleFailure(dispatcherName) } } } - private fun handleSuccess() { - this.delegate?.handleJobSucceeded(this) + private fun handleSuccess(dispatcherName: String) { + Log.i(TAG, "Completed processing of ${messages.size} messages") + this.delegate?.handleJobSucceeded(this, dispatcherName) } - private fun handleFailure() { - this.delegate?.handleJobFailed(this, Exception("One or more jobs resulted in failure")) + private fun handleFailure(dispatcherName: String) { + Log.i(TAG, "Handling failure of ${failures.size} messages (${messages.size - failures.size} processed successfully)") + this.delegate?.handleJobFailed(this, dispatcherName, Exception("One or more jobs resulted in failure")) } override fun serialize(): Data { diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/GroupAvatarDownloadJob.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/GroupAvatarDownloadJob.kt index 02f792117c..6429d760ae 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/GroupAvatarDownloadJob.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/GroupAvatarDownloadJob.kt @@ -12,7 +12,7 @@ class GroupAvatarDownloadJob(val room: String, val server: String) : Job { override var failureCount: Int = 0 override val maxFailureCount: Int = 10 - override fun execute() { + override fun execute(dispatcherName: String) { val storage = MessagingModuleConfiguration.shared.storage val imageId = storage.getOpenGroup(room, server)?.imageId ?: return try { @@ -20,9 +20,9 @@ class GroupAvatarDownloadJob(val room: String, val server: String) : Job { val groupId = GroupUtil.getEncodedOpenGroupID("$server.$room".toByteArray()) storage.updateProfilePicture(groupId, bytes) storage.updateTimestampUpdated(groupId, System.currentTimeMillis()) - delegate?.handleJobSucceeded(this) + delegate?.handleJobSucceeded(this, dispatcherName) } catch (e: Exception) { - delegate?.handleJobFailed(this, e) + delegate?.handleJobFailed(this, dispatcherName, e) } } diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/Job.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/Job.kt index 74feb83a61..74e324f0ea 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/Job.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/Job.kt @@ -17,7 +17,7 @@ interface Job { internal const val MAX_BUFFER_SIZE = 1_000_000 // bytes } - fun execute() + fun execute(dispatcherName: String) fun serialize(): Data diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/JobDelegate.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/JobDelegate.kt index 535ea27f3c..769458ab6d 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/JobDelegate.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/JobDelegate.kt @@ -2,7 +2,7 @@ package org.session.libsession.messaging.jobs interface JobDelegate { - fun handleJobSucceeded(job: Job) - fun handleJobFailed(job: Job, error: Exception) - fun handleJobFailedPermanently(job: Job, error: Exception) + fun handleJobSucceeded(job: Job, dispatcherName: String) + fun handleJobFailed(job: Job, dispatcherName: String, error: Exception) + fun handleJobFailedPermanently(job: Job, dispatcherName: String, error: Exception) } \ No newline at end of file diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt index 8e46f275f2..b78590c729 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt @@ -53,7 +53,7 @@ class JobQueue : JobDelegate { } if (openGroupId.isNullOrEmpty()) { Log.e("OpenGroupDispatcher", "Open Group ID was null on ${job.javaClass.simpleName}") - handleJobFailedPermanently(job, NullPointerException("Open Group ID was null")) + handleJobFailedPermanently(job, name, NullPointerException("Open Group ID was null")) } else { val groupChannel = if (!openGroupChannels.containsKey(openGroupId)) { Log.d("OpenGroupDispatcher", "Creating ${openGroupId.hashCode()} channel") @@ -95,9 +95,16 @@ class JobQueue : JobDelegate { } private fun Job.process(dispatcherName: String) { - Log.d(dispatcherName,"processJob: ${javaClass.simpleName}") + Log.d(dispatcherName,"processJob: ${javaClass.simpleName} (id: $id)") delegate = this@JobQueue - execute() + + try { + execute(dispatcherName) + } + catch (e: Exception) { + Log.d(dispatcherName, "unhandledJobException: ${javaClass.simpleName} (id: $id)") + this@JobQueue.handleJobFailed(this, dispatcherName, e) + } } init { @@ -177,7 +184,7 @@ class JobQueue : JobDelegate { return } if (!pendingJobIds.add(id)) { - Log.e("Loki","tried to re-queue pending/in-progress job") + Log.e("Loki","tried to re-queue pending/in-progress job (id: $id)") return } queue.trySend(job) @@ -196,7 +203,7 @@ class JobQueue : JobDelegate { } } pendingJobs.sortedBy { it.id }.forEach { job -> - Log.i("Loki", "Resuming pending job of type: ${job::class.simpleName}.") + Log.i("Loki", "Resuming pending job of type: ${job::class.simpleName} (id: ${job.id}).") queue.trySend(job) // Offer always called on unlimited capacity } } @@ -223,21 +230,21 @@ class JobQueue : JobDelegate { } } - override fun handleJobSucceeded(job: Job) { + override fun handleJobSucceeded(job: Job, dispatcherName: String) { val jobId = job.id ?: return MessagingModuleConfiguration.shared.storage.markJobAsSucceeded(jobId) pendingJobIds.remove(jobId) } - override fun handleJobFailed(job: Job, error: Exception) { + override fun handleJobFailed(job: Job, dispatcherName: String, error: Exception) { // Canceled val storage = MessagingModuleConfiguration.shared.storage if (storage.isJobCanceled(job)) { - return Log.i("Loki", "${job::class.simpleName} canceled.") + return Log.i("Loki", "${job::class.simpleName} canceled (id: ${job.id}).") } // Message send jobs waiting for the attachment to upload if (job is MessageSendJob && error is MessageSendJob.AwaitingAttachmentUploadException) { - Log.i("Loki", "Message send job waiting for attachment upload to finish.") + Log.i("Loki", "Message send job waiting for attachment upload to finish (id: ${job.id}).") return } @@ -255,21 +262,22 @@ class JobQueue : JobDelegate { job.failureCount += 1 if (job.failureCount >= job.maxFailureCount) { - handleJobFailedPermanently(job, error) + handleJobFailedPermanently(job, dispatcherName, error) } else { storage.persistJob(job) val retryInterval = getRetryInterval(job) - Log.i("Loki", "${job::class.simpleName} failed; scheduling retry (failure count is ${job.failureCount}).") + Log.i("Loki", "${job::class.simpleName} failed (id: ${job.id}); scheduling retry (failure count is ${job.failureCount}).") timer.schedule(delay = retryInterval) { - Log.i("Loki", "Retrying ${job::class.simpleName}.") + Log.i("Loki", "Retrying ${job::class.simpleName} (id: ${job.id}).") queue.trySend(job) } } } - override fun handleJobFailedPermanently(job: Job, error: Exception) { + override fun handleJobFailedPermanently(job: Job, dispatcherName: String, error: Exception) { val jobId = job.id ?: return handleJobFailedPermanently(jobId) + Log.d(dispatcherName, "permanentlyFailedJob: ${javaClass.simpleName} (id: ${job.id})") } private fun handleJobFailedPermanently(jobId: String) { diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageReceiveJob.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageReceiveJob.kt index 439fbb7a3a..2ba33b5632 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageReceiveJob.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageReceiveJob.kt @@ -25,11 +25,11 @@ class MessageReceiveJob(val data: ByteArray, val serverHash: String? = null, val private val OPEN_GROUP_ID_KEY = "open_group_id" } - override fun execute() { - executeAsync().get() + override fun execute(dispatcherName: String) { + executeAsync(dispatcherName).get() } - fun executeAsync(): Promise { + fun executeAsync(dispatcherName: String): Promise { val deferred = deferred() try { val isRetry: Boolean = failureCount != 0 @@ -39,32 +39,32 @@ class MessageReceiveJob(val data: ByteArray, val serverHash: String? = null, val val (message, proto) = MessageReceiver.parse(this.data, this.openGroupMessageServerID, openGroupPublicKey = serverPublicKey) message.serverHash = serverHash MessageReceiver.handle(message, proto, this.openGroupID) - this.handleSuccess() + this.handleSuccess(dispatcherName) deferred.resolve(Unit) } catch (e: Exception) { Log.e(TAG, "Couldn't receive message.", e) if (e is MessageReceiver.Error && !e.isRetryable) { Log.e("Loki", "Message receive job permanently failed.", e) - this.handlePermanentFailure(e) + this.handlePermanentFailure(dispatcherName, e) } else { Log.e("Loki", "Couldn't receive message.", e) - this.handleFailure(e) + this.handleFailure(dispatcherName, e) } deferred.resolve(Unit) // The promise is just used to keep track of when we're done } return deferred.promise } - private fun handleSuccess() { - delegate?.handleJobSucceeded(this) + private fun handleSuccess(dispatcherName: String) { + delegate?.handleJobSucceeded(this, dispatcherName) } - private fun handlePermanentFailure(e: Exception) { - delegate?.handleJobFailedPermanently(this, e) + private fun handlePermanentFailure(dispatcherName: String, e: Exception) { + delegate?.handleJobFailedPermanently(this, dispatcherName, e) } - private fun handleFailure(e: Exception) { - delegate?.handleJobFailed(this, e) + private fun handleFailure(dispatcherName: String, e: Exception) { + delegate?.handleJobFailed(this, dispatcherName, e) } override fun serialize(): Data { diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageSendJob.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageSendJob.kt index 8ce1adf481..524338592c 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageSendJob.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageSendJob.kt @@ -33,7 +33,7 @@ class MessageSendJob(val message: Message, val destination: Destination) : Job { private val DESTINATION_KEY = "destination" } - override fun execute() { + override fun execute(dispatcherName: String) { val messageDataProvider = MessagingModuleConfiguration.shared.messageDataProvider val message = message as? VisibleMessage val storage = MessagingModuleConfiguration.shared.storage @@ -61,12 +61,12 @@ class MessageSendJob(val message: Message, val destination: Destination) : Job { } } if (attachmentsToUpload.isNotEmpty()) { - this.handleFailure(AwaitingAttachmentUploadException) + this.handleFailure(dispatcherName, AwaitingAttachmentUploadException) return } // Wait for all attachments to upload before continuing } val promise = MessageSender.send(this.message, this.destination).success { - this.handleSuccess() + this.handleSuccess(dispatcherName) }.fail { exception -> var logStacktrace = true @@ -75,14 +75,14 @@ class MessageSendJob(val message: Message, val destination: Destination) : Job { is HTTP.HTTPRequestFailedException -> { logStacktrace = false - if (exception.statusCode == 429) { this.handlePermanentFailure(exception) } - else { this.handleFailure(exception) } + if (exception.statusCode == 429) { this.handlePermanentFailure(dispatcherName, exception) } + else { this.handleFailure(dispatcherName, exception) } } is MessageSender.Error -> { - if (!exception.isRetryable) { this.handlePermanentFailure(exception) } - else { this.handleFailure(exception) } + if (!exception.isRetryable) { this.handlePermanentFailure(dispatcherName, exception) } + else { this.handleFailure(dispatcherName, exception) } } - else -> this.handleFailure(exception) + else -> this.handleFailure(dispatcherName, exception) } if (logStacktrace) { Log.e(TAG, "Couldn't send message due to error", exception) } @@ -95,15 +95,15 @@ class MessageSendJob(val message: Message, val destination: Destination) : Job { } } - private fun handleSuccess() { - delegate?.handleJobSucceeded(this) + private fun handleSuccess(dispatcherName: String) { + delegate?.handleJobSucceeded(this, dispatcherName) } - private fun handlePermanentFailure(error: Exception) { - delegate?.handleJobFailedPermanently(this, error) + private fun handlePermanentFailure(dispatcherName: String, error: Exception) { + delegate?.handleJobFailedPermanently(this, dispatcherName, error) } - private fun handleFailure(error: Exception) { + private fun handleFailure(dispatcherName: String, error: Exception) { Log.w(TAG, "Failed to send $message::class.simpleName.") val message = message as? VisibleMessage if (message != null) { @@ -111,7 +111,7 @@ class MessageSendJob(val message: Message, val destination: Destination) : Job { return // The message has been deleted } } - delegate?.handleJobFailed(this, error) + delegate?.handleJobFailed(this, dispatcherName, error) } override fun serialize(): Data { diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/NotifyPNServerJob.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/NotifyPNServerJob.kt index 5c393c97b5..25fb2194c8 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/NotifyPNServerJob.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/NotifyPNServerJob.kt @@ -32,7 +32,7 @@ class NotifyPNServerJob(val message: SnodeMessage) : Job { private val MESSAGE_KEY = "message" } - override fun execute() { + override fun execute(dispatcherName: String) { val server = PushNotificationAPI.server val parameters = mapOf( "data" to message.data, "send_to" to message.recipient ) val url = "${server}/notify" @@ -48,18 +48,18 @@ class NotifyPNServerJob(val message: SnodeMessage) : Job { Log.d("Loki", "Couldn't notify PN server due to error: $exception.") } }.success { - handleSuccess() + handleSuccess(dispatcherName) }. fail { - handleFailure(it) + handleFailure(dispatcherName, it) } } - private fun handleSuccess() { - delegate?.handleJobSucceeded(this) + private fun handleSuccess(dispatcherName: String) { + delegate?.handleJobSucceeded(this, dispatcherName) } - private fun handleFailure(error: Exception) { - delegate?.handleJobFailed(this, error) + private fun handleFailure(dispatcherName: String, error: Exception) { + delegate?.handleJobFailed(this, dispatcherName, error) } override fun serialize(): Data { diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/OpenGroupDeleteJob.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/OpenGroupDeleteJob.kt index 1fb2d0df22..4c76f87633 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/OpenGroupDeleteJob.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/OpenGroupDeleteJob.kt @@ -19,7 +19,7 @@ class OpenGroupDeleteJob(private val messageServerIds: LongArray, private val th override var failureCount: Int = 0 override val maxFailureCount: Int = 1 - override fun execute() { + override fun execute(dispatcherName: String) { val dataProvider = MessagingModuleConfiguration.shared.messageDataProvider val numberToDelete = messageServerIds.size Log.d(TAG, "Deleting $numberToDelete messages") @@ -39,10 +39,10 @@ class OpenGroupDeleteJob(private val messageServerIds: LongArray, private val th } Log.d(TAG, "Deleted ${messageIds.first.size + messageIds.second.size} messages successfully") - delegate?.handleJobSucceeded(this) + delegate?.handleJobSucceeded(this, dispatcherName) } catch (e: Exception) { - delegate?.handleJobFailed(this, e) + delegate?.handleJobFailed(this, dispatcherName, e) } } diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/TrimThreadJob.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/TrimThreadJob.kt index e02a2f00e5..d082ac7088 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/TrimThreadJob.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/TrimThreadJob.kt @@ -20,7 +20,7 @@ class TrimThreadJob(val threadId: Long, val openGroupId: String?) : Job { const val THREAD_LENGTH_TRIGGER_SIZE = 2000 } - override fun execute() { + override fun execute(dispatcherName: String) { val context = MessagingModuleConfiguration.shared.context val trimmingEnabled = TextSecurePreferences.isThreadLengthTrimmingEnabled(context) val storage = MessagingModuleConfiguration.shared.storage @@ -29,7 +29,7 @@ class TrimThreadJob(val threadId: Long, val openGroupId: String?) : Job { val oldestMessageTime = System.currentTimeMillis() - TRIM_TIME_LIMIT storage.trimThreadBefore(threadId, oldestMessageTime) } - delegate?.handleJobSucceeded(this) + delegate?.handleJobSucceeded(this, dispatcherName) } override fun serialize(): Data {