Fixed a few bugs, added logging and removed some old code

Added the ability to copy the sessionId of open group URL from the conversation menu
Added additional logging to the BatchMessageReceiveJob to make future debugging easier
Removed the OpenGroupMigrator
Updated the JobQueue logging to provide more insight
Fixed an issue where the database migrations weren't blocking which could result in failing/crashing SQL queries
Fixed an issue where the new database file wouldn't be removed if a migration error was thrown
Fixed an issue where the new database could exist in an invalid state and the app wouldn't attempt to remigrate
Fixed an incorrectly throw exception in the PassphrasePromptActivity
This commit is contained in:
Morgan Pretty 2023-02-03 13:33:52 +11:00
parent cf916a5762
commit 0256735135
24 changed files with 204 additions and 619 deletions

View File

@ -64,7 +64,6 @@ import org.thoughtcrime.securesms.dependencies.DatabaseComponent;
import org.thoughtcrime.securesms.dependencies.DatabaseModule; import org.thoughtcrime.securesms.dependencies.DatabaseModule;
import org.thoughtcrime.securesms.emoji.EmojiSource; import org.thoughtcrime.securesms.emoji.EmojiSource;
import org.thoughtcrime.securesms.groups.OpenGroupManager; import org.thoughtcrime.securesms.groups.OpenGroupManager;
import org.thoughtcrime.securesms.groups.OpenGroupMigrator;
import org.thoughtcrime.securesms.home.HomeActivity; import org.thoughtcrime.securesms.home.HomeActivity;
import org.thoughtcrime.securesms.jobmanager.JobManager; import org.thoughtcrime.securesms.jobmanager.JobManager;
import org.thoughtcrime.securesms.jobmanager.impl.JsonDataSerializer; import org.thoughtcrime.securesms.jobmanager.impl.JsonDataSerializer;
@ -206,9 +205,6 @@ public class ApplicationContext extends Application implements DefaultLifecycleO
storage, storage,
messageDataProvider, messageDataProvider,
()-> KeyPairUtilities.INSTANCE.getUserED25519KeyPair(this)); ()-> KeyPairUtilities.INSTANCE.getUserED25519KeyPair(this));
// migrate session open group data
OpenGroupMigrator.migrate(getDatabaseComponent());
// end migration
callMessageProcessor = new CallMessageProcessor(this, textSecurePreferences, ProcessLifecycleOwner.get().getLifecycle(), storage); callMessageProcessor = new CallMessageProcessor(this, textSecurePreferences, ProcessLifecycleOwner.get().getLifecycle(), storage);
Log.i(TAG, "onCreate()"); Log.i(TAG, "onCreate()");
startKovenant(); startKovenant();

View File

@ -210,8 +210,7 @@ public class PassphrasePromptActivity extends BaseActionBarActivity {
try { try {
signature = biometricSecretProvider.getOrCreateBiometricSignature(this); signature = biometricSecretProvider.getOrCreateBiometricSignature(this);
hasSignatureObject = true; hasSignatureObject = true;
throw new InvalidKeyException("e"); } catch (Exception e) {
} catch (InvalidKeyException e) {
signature = null; signature = null;
hasSignatureObject = false; hasSignatureObject = false;
Log.e(TAG, "Error getting / creating signature", e); Log.e(TAG, "Error getting / creating signature", e);

View File

@ -963,6 +963,18 @@ class ConversationActivityV2 : PassphraseRequiredActionBarActivity(), InputBarDe
Toast.makeText(this, R.string.copied_to_clipboard, Toast.LENGTH_SHORT).show() Toast.makeText(this, R.string.copied_to_clipboard, Toast.LENGTH_SHORT).show()
} }
override fun copyOpenGroupUrl(thread: Recipient) {
if (!thread.isOpenGroupRecipient) { return }
val threadId = threadDb.getThreadIdIfExistsFor(thread) ?: return
val openGroup = lokiThreadDb.getOpenGroupChat(threadId) ?: return
val clip = ClipData.newPlainText("Community URL", openGroup.joinURL)
val manager = getSystemService(PassphraseRequiredActionBarActivity.CLIPBOARD_SERVICE) as ClipboardManager
manager.setPrimaryClip(clip)
Toast.makeText(this, R.string.copied_to_clipboard, Toast.LENGTH_SHORT).show()
}
override fun showExpiringMessagesDialog(thread: Recipient) { override fun showExpiringMessagesDialog(thread: Recipient) {
if (thread.isClosedGroupRecipient) { if (thread.isClosedGroupRecipient) {
val group = groupDb.getGroup(thread.address.toGroupString()).orNull() val group = groupDb.getGroup(thread.address.toGroupString()).orNull()

View File

@ -78,6 +78,10 @@ object ConversationMenuHelper {
inflater.inflate(R.menu.menu_conversation_expiration_off, menu) inflater.inflate(R.menu.menu_conversation_expiration_off, menu)
} }
} }
// One-on-one chat menu allows copying the session id
if (thread.isContactRecipient) {
inflater.inflate(R.menu.menu_conversation_copy_session_id, menu)
}
// One-on-one chat menu (options that should only be present for one-on-one chats) // One-on-one chat menu (options that should only be present for one-on-one chats)
if (thread.isContactRecipient) { if (thread.isContactRecipient) {
if (thread.isBlocked) { if (thread.isBlocked) {
@ -154,6 +158,7 @@ object ConversationMenuHelper {
R.id.menu_block -> { block(context, thread, deleteThread = false) } R.id.menu_block -> { block(context, thread, deleteThread = false) }
R.id.menu_block_delete -> { blockAndDelete(context, thread) } R.id.menu_block_delete -> { blockAndDelete(context, thread) }
R.id.menu_copy_session_id -> { copySessionID(context, thread) } R.id.menu_copy_session_id -> { copySessionID(context, thread) }
R.id.menu_copy_open_group_url -> { copyOpenGroupUrl(context, thread) }
R.id.menu_edit_group -> { editClosedGroup(context, thread) } R.id.menu_edit_group -> { editClosedGroup(context, thread) }
R.id.menu_leave_group -> { leaveClosedGroup(context, thread) } R.id.menu_leave_group -> { leaveClosedGroup(context, thread) }
R.id.menu_invite_to_open_group -> { inviteContacts(context, thread) } R.id.menu_invite_to_open_group -> { inviteContacts(context, thread) }
@ -270,6 +275,12 @@ object ConversationMenuHelper {
listener.copySessionID(thread.address.toString()) listener.copySessionID(thread.address.toString())
} }
private fun copyOpenGroupUrl(context: Context, thread: Recipient) {
if (!thread.isOpenGroupRecipient) { return }
val listener = context as? ConversationMenuListener ?: return
listener.copyOpenGroupUrl(thread)
}
private fun editClosedGroup(context: Context, thread: Recipient) { private fun editClosedGroup(context: Context, thread: Recipient) {
if (!thread.isClosedGroupRecipient) { return } if (!thread.isClosedGroupRecipient) { return }
val intent = Intent(context, EditClosedGroupActivity::class.java) val intent = Intent(context, EditClosedGroupActivity::class.java)
@ -344,6 +355,7 @@ object ConversationMenuHelper {
fun block(deleteThread: Boolean = false) fun block(deleteThread: Boolean = false)
fun unblock() fun unblock()
fun copySessionID(sessionId: String) fun copySessionID(sessionId: String)
fun copyOpenGroupUrl(thread: Recipient)
fun showExpiringMessagesDialog(thread: Recipient) fun showExpiringMessagesDialog(thread: Recipient)
} }

View File

@ -57,7 +57,6 @@ import org.thoughtcrime.securesms.database.model.MessageRecord;
import org.thoughtcrime.securesms.database.model.MmsMessageRecord; import org.thoughtcrime.securesms.database.model.MmsMessageRecord;
import org.thoughtcrime.securesms.database.model.ThreadRecord; import org.thoughtcrime.securesms.database.model.ThreadRecord;
import org.thoughtcrime.securesms.dependencies.DatabaseComponent; import org.thoughtcrime.securesms.dependencies.DatabaseComponent;
import org.thoughtcrime.securesms.groups.OpenGroupMigrator;
import org.thoughtcrime.securesms.mms.Slide; import org.thoughtcrime.securesms.mms.Slide;
import org.thoughtcrime.securesms.mms.SlideDeck; import org.thoughtcrime.securesms.mms.SlideDeck;
import org.thoughtcrime.securesms.notifications.MarkReadReceiver; import org.thoughtcrime.securesms.notifications.MarkReadReceiver;
@ -800,77 +799,6 @@ public class ThreadDatabase extends Database {
return query; return query;
} }
@NotNull
public List<ThreadRecord> getHttpOxenOpenGroups() {
String where = TABLE_NAME+"."+ADDRESS+" LIKE ?";
String selection = OpenGroupMigrator.HTTP_PREFIX+OpenGroupMigrator.OPEN_GET_SESSION_TRAILING_DOT_ENCODED +"%";
SQLiteDatabase db = databaseHelper.getReadableDatabase();
String query = createQuery(where, 0);
Cursor cursor = db.rawQuery(query, new String[]{selection});
if (cursor == null) {
return Collections.emptyList();
}
List<ThreadRecord> threads = new ArrayList<>();
try {
Reader reader = readerFor(cursor);
ThreadRecord record;
while ((record = reader.getNext()) != null) {
threads.add(record);
}
} finally {
cursor.close();
}
return threads;
}
@NotNull
public List<ThreadRecord> getLegacyOxenOpenGroups() {
String where = TABLE_NAME+"."+ADDRESS+" LIKE ?";
String selection = OpenGroupMigrator.LEGACY_GROUP_ENCODED_ID+"%";
SQLiteDatabase db = databaseHelper.getReadableDatabase();
String query = createQuery(where, 0);
Cursor cursor = db.rawQuery(query, new String[]{selection});
if (cursor == null) {
return Collections.emptyList();
}
List<ThreadRecord> threads = new ArrayList<>();
try {
Reader reader = readerFor(cursor);
ThreadRecord record;
while ((record = reader.getNext()) != null) {
threads.add(record);
}
} finally {
cursor.close();
}
return threads;
}
@NotNull
public List<ThreadRecord> getHttpsOxenOpenGroups() {
String where = TABLE_NAME+"."+ADDRESS+" LIKE ?";
String selection = OpenGroupMigrator.NEW_GROUP_ENCODED_ID+"%";
SQLiteDatabase db = databaseHelper.getReadableDatabase();
String query = createQuery(where, 0);
Cursor cursor = db.rawQuery(query, new String[]{selection});
if (cursor == null) {
return Collections.emptyList();
}
List<ThreadRecord> threads = new ArrayList<>();
try {
Reader reader = readerFor(cursor);
ThreadRecord record;
while ((record = reader.getNext()) != null) {
threads.add(record);
}
} finally {
cursor.close();
}
return threads;
}
public void migrateEncodedGroup(long threadId, @NotNull String newEncodedGroupId) { public void migrateEncodedGroup(long threadId, @NotNull String newEncodedGroupId) {
ContentValues contentValues = new ContentValues(1); ContentValues contentValues = new ContentValues(1);
contentValues.put(ADDRESS, newEncodedGroupId); contentValues.put(ADDRESS, newEncodedGroupId);

View File

@ -97,25 +97,40 @@ public class SQLCipherOpenHelper extends SQLiteOpenHelper {
private final DatabaseSecret databaseSecret; private final DatabaseSecret databaseSecret;
public SQLCipherOpenHelper(@NonNull Context context, @NonNull DatabaseSecret databaseSecret) { public SQLCipherOpenHelper(@NonNull Context context, @NonNull DatabaseSecret databaseSecret) {
super(context, DATABASE_NAME, databaseSecret.asString(), null, DATABASE_VERSION, MIN_DATABASE_VERSION, null, new SQLiteDatabaseHook() { super(
@Override context,
public void preKey(SQLiteConnection connection) { DATABASE_NAME,
SQLCipherOpenHelper.applySQLCipherPragmas(connection, true); databaseSecret.asString(),
} null,
DATABASE_VERSION,
@Override MIN_DATABASE_VERSION,
public void postKey(SQLiteConnection connection) { null,
SQLCipherOpenHelper.applySQLCipherPragmas(connection, true); new SQLiteDatabaseHook() {
@Override
// if not vacuumed in a while, perform that operation public void preKey(SQLiteConnection connection) {
long currentTime = System.currentTimeMillis(); SQLCipherOpenHelper.applySQLCipherPragmas(connection, true);
// 7 days
if (currentTime - TextSecurePreferences.getLastVacuumTime(context) > 604_800_000) {
connection.execute("VACUUM;", null, null);
TextSecurePreferences.setLastVacuumNow(context);
} }
}
}, true); @Override
public void postKey(SQLiteConnection connection) {
SQLCipherOpenHelper.applySQLCipherPragmas(connection, true);
// if not vacuumed in a while, perform that operation
long currentTime = System.currentTimeMillis();
// 7 days
if (currentTime - TextSecurePreferences.getLastVacuumTime(context) > 604_800_000) {
connection.execute("VACUUM;", null, null);
TextSecurePreferences.setLastVacuumNow(context);
}
}
},
// Note: Now that we support concurrent database reads the migrations are actually non-blocking
// because of this we need to initially open the database with writeAheadLogging (WAL mode) disabled
// and enable it once the database officially opens it's connection (which will cause it to re-connect
// in WAL mode) - this is a little inefficient but will prevent SQL-related errors/crashes due to
// incomplete migrations
false
);
this.context = context.getApplicationContext(); this.context = context.getApplicationContext();
this.databaseSecret = databaseSecret; this.databaseSecret = databaseSecret;
@ -150,11 +165,11 @@ public class SQLCipherOpenHelper extends SQLiteOpenHelper {
// If the old SQLCipher3 database file doesn't exist then no need to do anything // If the old SQLCipher3 database file doesn't exist then no need to do anything
if (!oldDbFile.exists()) { return; } if (!oldDbFile.exists()) { return; }
try { // Define the location for the new database
// Define the location for the new database String newDbPath = context.getDatabasePath(DATABASE_NAME).getPath();
String newDbPath = context.getDatabasePath(DATABASE_NAME).getPath(); File newDbFile = new File(newDbPath);
File newDbFile = new File(newDbPath);
try {
// If the new database file already exists then check if it's valid first, if it's in an // If the new database file already exists then check if it's valid first, if it's in an
// invalid state we should delete it and try to migrate again // invalid state we should delete it and try to migrate again
if (newDbFile.exists()) { if (newDbFile.exists()) {
@ -162,10 +177,24 @@ public class SQLCipherOpenHelper extends SQLiteOpenHelper {
// assume the user hasn't downgraded for some reason and made changes to the old database and // assume the user hasn't downgraded for some reason and made changes to the old database and
// can remove the old database file (it won't be used anymore) // can remove the old database file (it won't be used anymore)
if (oldDbFile.lastModified() <= newDbFile.lastModified()) { if (oldDbFile.lastModified() <= newDbFile.lastModified()) {
// TODO: Delete 'CIPHER3_DATABASE_NAME' once enough time has past try {
// //noinspection ResultOfMethodCallIgnored SQLiteDatabase newDb = SQLCipherOpenHelper.open(newDbPath, databaseSecret, true);
// oldDbFile.delete(); int version = newDb.getVersion();
return; newDb.close();
// Make sure the new database has it's version set correctly (if not then the migration didn't
// fully succeed and the database will try to create all it's tables and immediately fail so
// we will need to remove and remigrate)
if (version > 0) {
// TODO: Delete 'CIPHER3_DATABASE_NAME' once enough time has past
// //noinspection ResultOfMethodCallIgnored
// oldDbFile.delete();
return;
}
}
catch (Exception e) {
Log.i(TAG, "Failed to retrieve version from new database, assuming invalid and remigrating");
}
} }
// If the old database does have newer changes then the new database could have stale/invalid // If the old database does have newer changes then the new database could have stale/invalid
@ -207,6 +236,11 @@ public class SQLCipherOpenHelper extends SQLiteOpenHelper {
catch (Exception e) { catch (Exception e) {
Log.e(TAG, "Migration from SQLCipher3 to SQLCipher4 failed", e); Log.e(TAG, "Migration from SQLCipher3 to SQLCipher4 failed", e);
// If an exception was thrown then we should remove the new database file (it's probably invalid)
if (!newDbFile.delete()) {
Log.e(TAG, "Unable to delete invalid new database file");
}
// Notify the user of the issue so they know they can downgrade until the issue is fixed // Notify the user of the issue so they know they can downgrade until the issue is fixed
NotificationManager notificationManager = context.getSystemService(NotificationManager.class); NotificationManager notificationManager = context.getSystemService(NotificationManager.class);
String channelId = context.getString(R.string.NotificationChannel_failures); String channelId = context.getString(R.string.NotificationChannel_failures);
@ -559,6 +593,15 @@ public class SQLCipherOpenHelper extends SQLiteOpenHelper {
} }
} }
@Override
public void onOpen(SQLiteDatabase db) {
super.onOpen(db);
// Now that the database is officially open (ie. the migrations are completed) we want to enable
// write ahead logging (WAL mode) to officially support concurrent read connections
db.enableWriteAheadLogging();
}
public void markCurrent(SQLiteDatabase db) { public void markCurrent(SQLiteDatabase db) {
db.setVersion(DATABASE_VERSION); db.setVersion(DATABASE_VERSION);
} }

View File

@ -1,139 +0,0 @@
package org.thoughtcrime.securesms.groups
import androidx.annotation.VisibleForTesting
import org.session.libsession.messaging.open_groups.OpenGroupApi
import org.session.libsession.utilities.recipients.Recipient
import org.session.libsignal.utilities.Hex
import org.thoughtcrime.securesms.database.model.ThreadRecord
import org.thoughtcrime.securesms.dependencies.DatabaseComponent
object OpenGroupMigrator {
const val HTTP_PREFIX = "__loki_public_chat_group__!687474703a2f2f"
private const val HTTPS_PREFIX = "__loki_public_chat_group__!68747470733a2f2f"
const val OPEN_GET_SESSION_TRAILING_DOT_ENCODED = "6f70656e2e67657473657373696f6e2e6f72672e"
const val LEGACY_GROUP_ENCODED_ID = "__loki_public_chat_group__!687474703a2f2f3131362e3230332e37302e33332e" // old IP based toByteArray()
const val NEW_GROUP_ENCODED_ID = "__loki_public_chat_group__!68747470733a2f2f6f70656e2e67657473657373696f6e2e6f72672e" // new URL based toByteArray()
data class OpenGroupMapping(val stub: String, val legacyThreadId: Long, val newThreadId: Long?)
@VisibleForTesting
fun Recipient.roomStub(): String? {
if (!isOpenGroupRecipient) return null
val serialized = address.serialize()
if (serialized.startsWith(LEGACY_GROUP_ENCODED_ID)) {
return serialized.replace(LEGACY_GROUP_ENCODED_ID,"")
} else if (serialized.startsWith(NEW_GROUP_ENCODED_ID)) {
return serialized.replace(NEW_GROUP_ENCODED_ID,"")
} else if (serialized.startsWith(HTTP_PREFIX + OPEN_GET_SESSION_TRAILING_DOT_ENCODED)) {
return serialized.replace(HTTP_PREFIX + OPEN_GET_SESSION_TRAILING_DOT_ENCODED, "")
}
return null
}
@VisibleForTesting
fun getExistingMappings(legacy: List<ThreadRecord>, new: List<ThreadRecord>): List<OpenGroupMapping> {
val legacyStubsMapping = legacy.mapNotNull { thread ->
val stub = thread.recipient.roomStub()
stub?.let { it to thread.threadId }
}
val newStubsMapping = new.mapNotNull { thread ->
val stub = thread.recipient.roomStub()
stub?.let { it to thread.threadId }
}
return legacyStubsMapping.map { (legacyEncodedStub, legacyId) ->
// get 'new' open group thread ID if stubs match
OpenGroupMapping(
legacyEncodedStub,
legacyId,
newStubsMapping.firstOrNull { (newEncodedStub, _) -> newEncodedStub == legacyEncodedStub }?.second
)
}
}
@JvmStatic
fun migrate(databaseComponent: DatabaseComponent) {
// migrate thread db
val threadDb = databaseComponent.threadDatabase()
val legacyOpenGroups = threadDb.legacyOxenOpenGroups
val httpBasedNewGroups = threadDb.httpOxenOpenGroups
if (legacyOpenGroups.isEmpty() && httpBasedNewGroups.isEmpty()) return // no need to migrate
val newOpenGroups = threadDb.httpsOxenOpenGroups
val firstStepMigration = getExistingMappings(legacyOpenGroups, newOpenGroups)
val secondStepMigration = getExistingMappings(httpBasedNewGroups, newOpenGroups)
val groupDb = databaseComponent.groupDatabase()
val lokiApiDb = databaseComponent.lokiAPIDatabase()
val smsDb = databaseComponent.smsDatabase()
val mmsDb = databaseComponent.mmsDatabase()
val lokiMessageDatabase = databaseComponent.lokiMessageDatabase()
val lokiThreadDatabase = databaseComponent.lokiThreadDatabase()
firstStepMigration.forEach { (stub, old, new) ->
val legacyEncodedGroupId = LEGACY_GROUP_ENCODED_ID+stub
if (new == null) {
val newEncodedGroupId = NEW_GROUP_ENCODED_ID+stub
// migrate thread and group encoded values
threadDb.migrateEncodedGroup(old, newEncodedGroupId)
groupDb.migrateEncodedGroup(legacyEncodedGroupId, newEncodedGroupId)
// migrate Loki API DB values
// decode the hex to bytes, decode byte array to string i.e. "oxen" or "session"
val decodedStub = Hex.fromStringCondensed(stub).decodeToString()
val legacyLokiServerId = "${OpenGroupApi.legacyDefaultServer}.$decodedStub"
val newLokiServerId = "${OpenGroupApi.defaultServer}.$decodedStub"
lokiApiDb.migrateLegacyOpenGroup(legacyLokiServerId, newLokiServerId)
// migrate loki thread db server info
val oldServerInfo = lokiThreadDatabase.getOpenGroupChat(old)
val newServerInfo = oldServerInfo!!.copy(server = OpenGroupApi.defaultServer, id = newLokiServerId)
lokiThreadDatabase.setOpenGroupChat(newServerInfo, old)
} else {
// has a legacy and a new one
// migrate SMS and MMS tables
smsDb.migrateThreadId(old, new)
mmsDb.migrateThreadId(old, new)
lokiMessageDatabase.migrateThreadId(old, new)
// delete group for legacy ID
groupDb.delete(legacyEncodedGroupId)
// delete thread for legacy ID
threadDb.deleteConversation(old)
lokiThreadDatabase.removeOpenGroupChat(old)
}
// maybe migrate jobs here
}
secondStepMigration.forEach { (stub, old, new) ->
val legacyEncodedGroupId = HTTP_PREFIX + OPEN_GET_SESSION_TRAILING_DOT_ENCODED + stub
if (new == null) {
val newEncodedGroupId = NEW_GROUP_ENCODED_ID+stub
// migrate thread and group encoded values
threadDb.migrateEncodedGroup(old, newEncodedGroupId)
groupDb.migrateEncodedGroup(legacyEncodedGroupId, newEncodedGroupId)
// migrate Loki API DB values
// decode the hex to bytes, decode byte array to string i.e. "oxen" or "session"
val decodedStub = Hex.fromStringCondensed(stub).decodeToString()
val legacyLokiServerId = "${OpenGroupApi.httpDefaultServer}.$decodedStub"
val newLokiServerId = "${OpenGroupApi.defaultServer}.$decodedStub"
lokiApiDb.migrateLegacyOpenGroup(legacyLokiServerId, newLokiServerId)
// migrate loki thread db server info
val oldServerInfo = lokiThreadDatabase.getOpenGroupChat(old)
val newServerInfo = oldServerInfo!!.copy(server = OpenGroupApi.defaultServer, id = newLokiServerId)
lokiThreadDatabase.setOpenGroupChat(newServerInfo, old)
} else {
// has a legacy and a new one
// migrate SMS and MMS tables
smsDb.migrateThreadId(old, new)
mmsDb.migrateThreadId(old, new)
lokiMessageDatabase.migrateThreadId(old, new)
// delete group for legacy ID
groupDb.delete(legacyEncodedGroupId)
// delete thread for legacy ID
threadDb.deleteConversation(old)
lokiThreadDatabase.removeOpenGroupChat(old)
}
// maybe migrate jobs here
}
}
}

View File

@ -60,7 +60,7 @@ class BackgroundPollWorker(val context: Context, params: WorkerParameters) : Wor
// FIXME: Using a job here seems like a bad idea... // FIXME: Using a job here seems like a bad idea...
MessageReceiveParameters(envelope.toByteArray(), serverHash, null) MessageReceiveParameters(envelope.toByteArray(), serverHash, null)
} }
BatchMessageReceiveJob(params).executeAsync() BatchMessageReceiveJob(params).executeAsync("background")
} }
promises.add(dmsPromise) promises.add(dmsPromise)

View File

@ -2,6 +2,10 @@
<menu <menu
xmlns:android="http://schemas.android.com/apk/res/android"> xmlns:android="http://schemas.android.com/apk/res/android">
<item
android:title="@string/ConversationActivity_copy_open_group_url"
android:id="@+id/menu_copy_open_group_url" />
<item <item
android:title="@string/ConversationActivity_invite_to_open_group" android:title="@string/ConversationActivity_invite_to_open_group"
android:id="@+id/menu_invite_to_open_group" /> android:id="@+id/menu_invite_to_open_group" />

View File

@ -77,6 +77,7 @@
<string name="ConversationActivity_attachment_exceeds_size_limits">Attachment exceeds size limits for the type of message you\'re sending.</string> <string name="ConversationActivity_attachment_exceeds_size_limits">Attachment exceeds size limits for the type of message you\'re sending.</string>
<string name="ConversationActivity_unable_to_record_audio">Unable to record audio!</string> <string name="ConversationActivity_unable_to_record_audio">Unable to record audio!</string>
<string name="ConversationActivity_there_is_no_app_available_to_handle_this_link_on_your_device">There is no app available to handle this link on your device.</string> <string name="ConversationActivity_there_is_no_app_available_to_handle_this_link_on_your_device">There is no app available to handle this link on your device.</string>
<string name="ConversationActivity_copy_open_group_url">Copy Community URL</string>
<string name="ConversationActivity_invite_to_open_group">Add members</string> <string name="ConversationActivity_invite_to_open_group">Add members</string>
<string name="ConversationActivity_to_send_audio_messages_allow_signal_access_to_your_microphone">Session needs microphone access to send audio messages.</string> <string name="ConversationActivity_to_send_audio_messages_allow_signal_access_to_your_microphone">Session needs microphone access to send audio messages.</string>
<string name="ConversationActivity_signal_requires_the_microphone_permission_in_order_to_send_audio_messages">Session needs microphone access to send audio messages, but it has been permanently denied. Please continue to app settings, select \"Permissions\", and enable \"Microphone\".</string> <string name="ConversationActivity_signal_requires_the_microphone_permission_in_order_to_send_audio_messages">Session needs microphone access to send audio messages, but it has been permanently denied. Please continue to app settings, select \"Permissions\", and enable \"Microphone\".</string>

View File

@ -1,281 +0,0 @@
package org.thoughtcrime.securesms.util
import org.junit.Assert.assertEquals
import org.junit.Assert.assertTrue
import org.junit.Test
import org.mockito.kotlin.KStubbing
import org.mockito.kotlin.argumentCaptor
import org.mockito.kotlin.doAnswer
import org.mockito.kotlin.doReturn
import org.mockito.kotlin.eq
import org.mockito.kotlin.mock
import org.mockito.kotlin.verify
import org.mockito.kotlin.verifyNoMoreInteractions
import org.session.libsession.messaging.open_groups.OpenGroup
import org.session.libsession.messaging.open_groups.OpenGroupApi
import org.session.libsession.utilities.Address
import org.session.libsession.utilities.recipients.Recipient
import org.thoughtcrime.securesms.database.GroupDatabase
import org.thoughtcrime.securesms.database.LokiAPIDatabase
import org.thoughtcrime.securesms.database.LokiMessageDatabase
import org.thoughtcrime.securesms.database.LokiThreadDatabase
import org.thoughtcrime.securesms.database.MmsDatabase
import org.thoughtcrime.securesms.database.SmsDatabase
import org.thoughtcrime.securesms.database.ThreadDatabase
import org.thoughtcrime.securesms.database.model.ThreadRecord
import org.thoughtcrime.securesms.dependencies.DatabaseComponent
import org.thoughtcrime.securesms.groups.OpenGroupMigrator
import org.thoughtcrime.securesms.groups.OpenGroupMigrator.OpenGroupMapping
import org.thoughtcrime.securesms.groups.OpenGroupMigrator.roomStub
class OpenGroupMigrationTests {
companion object {
const val EXAMPLE_LEGACY_ENCODED_OPEN_GROUP = "__loki_public_chat_group__!687474703a2f2f3131362e3230332e37302e33332e6f78656e"
const val EXAMPLE_NEW_ENCODED_OPEN_GROUP = "__loki_public_chat_group__!68747470733a2f2f6f70656e2e67657473657373696f6e2e6f72672e6f78656e"
const val OXEN_STUB_HEX = "6f78656e"
const val EXAMPLE_LEGACY_SERVER_ID = "http://116.203.70.33.oxen"
const val EXAMPLE_NEW_SERVER_ID = "https://open.getsession.org.oxen"
const val LEGACY_THREAD_ID = 1L
const val NEW_THREAD_ID = 2L
}
private fun legacyOpenGroupRecipient(additionalMocks: ((KStubbing<Recipient>) -> Unit) ? = null) = mock<Recipient> {
on { address } doReturn Address.fromSerialized(EXAMPLE_LEGACY_ENCODED_OPEN_GROUP)
on { isOpenGroupRecipient } doReturn true
additionalMocks?.let { it(this) }
}
private fun newOpenGroupRecipient(additionalMocks: ((KStubbing<Recipient>) -> Unit) ? = null) = mock<Recipient> {
on { address } doReturn Address.fromSerialized(EXAMPLE_NEW_ENCODED_OPEN_GROUP)
on { isOpenGroupRecipient } doReturn true
additionalMocks?.let { it(this) }
}
private fun legacyThreadRecord(additionalRecipientMocks: ((KStubbing<Recipient>) -> Unit) ? = null, additionalThreadMocks: ((KStubbing<ThreadRecord>) -> Unit)? = null) = mock<ThreadRecord> {
val returnedRecipient = legacyOpenGroupRecipient(additionalRecipientMocks)
on { recipient } doReturn returnedRecipient
on { threadId } doReturn LEGACY_THREAD_ID
}
private fun newThreadRecord(additionalRecipientMocks: ((KStubbing<Recipient>) -> Unit)? = null, additionalThreadMocks: ((KStubbing<ThreadRecord>) -> Unit)? = null) = mock<ThreadRecord> {
val returnedRecipient = newOpenGroupRecipient(additionalRecipientMocks)
on { recipient } doReturn returnedRecipient
on { threadId } doReturn NEW_THREAD_ID
}
@Test
fun `it should generate the correct room stubs for legacy groups`() {
val mockRecipient = legacyOpenGroupRecipient()
assertEquals(OXEN_STUB_HEX, mockRecipient.roomStub())
}
@Test
fun `it should generate the correct room stubs for new groups`() {
val mockNewRecipient = newOpenGroupRecipient()
assertEquals(OXEN_STUB_HEX, mockNewRecipient.roomStub())
}
@Test
fun `it should return correct mappings`() {
val legacyThread = legacyThreadRecord()
val newThread = newThreadRecord()
val expectedMapping = listOf(
OpenGroupMapping(OXEN_STUB_HEX, LEGACY_THREAD_ID, NEW_THREAD_ID)
)
assertTrue(expectedMapping.containsAll(OpenGroupMigrator.getExistingMappings(listOf(legacyThread), listOf(newThread))))
}
@Test
fun `it should return no mappings if there are no legacy open groups`() {
val mappings = OpenGroupMigrator.getExistingMappings(listOf(), listOf())
assertTrue(mappings.isEmpty())
}
@Test
fun `it should return no mappings if there are only new open groups`() {
val newThread = newThreadRecord()
val mappings = OpenGroupMigrator.getExistingMappings(emptyList(), listOf(newThread))
assertTrue(mappings.isEmpty())
}
@Test
fun `it should return null new thread in mappings if there are only legacy open groups`() {
val legacyThread = legacyThreadRecord()
val mappings = OpenGroupMigrator.getExistingMappings(listOf(legacyThread), emptyList())
val expectedMappings = listOf(
OpenGroupMapping(OXEN_STUB_HEX, LEGACY_THREAD_ID, null)
)
assertTrue(expectedMappings.containsAll(mappings))
}
@Test
fun `test migration thread DB calls legacy and returns if no legacy official groups`() {
val mockedThreadDb = mock<ThreadDatabase> {
on { legacyOxenOpenGroups } doReturn emptyList()
}
val mockedDbComponent = mock<DatabaseComponent> {
on { threadDatabase() } doReturn mockedThreadDb
}
OpenGroupMigrator.migrate(mockedDbComponent)
verify(mockedDbComponent).threadDatabase()
verify(mockedThreadDb).legacyOxenOpenGroups
verifyNoMoreInteractions(mockedThreadDb)
}
@Test
fun `it should migrate on thread, group and loki dbs with correct values for legacy only migration`() {
// mock threadDB
val capturedThreadId = argumentCaptor<Long>()
val capturedNewEncoded = argumentCaptor<String>()
val mockedThreadDb = mock<ThreadDatabase> {
val legacyThreadRecord = legacyThreadRecord()
on { legacyOxenOpenGroups } doReturn listOf(legacyThreadRecord)
on { httpsOxenOpenGroups } doReturn emptyList()
on { migrateEncodedGroup(capturedThreadId.capture(), capturedNewEncoded.capture()) } doAnswer {}
}
// mock groupDB
val capturedGroupLegacyEncoded = argumentCaptor<String>()
val capturedGroupNewEncoded = argumentCaptor<String>()
val mockedGroupDb = mock<GroupDatabase> {
on {
migrateEncodedGroup(
capturedGroupLegacyEncoded.capture(),
capturedGroupNewEncoded.capture()
)
} doAnswer {}
}
// mock LokiAPIDB
val capturedLokiLegacyGroup = argumentCaptor<String>()
val capturedLokiNewGroup = argumentCaptor<String>()
val mockedLokiApi = mock<LokiAPIDatabase> {
on { migrateLegacyOpenGroup(capturedLokiLegacyGroup.capture(), capturedLokiNewGroup.capture()) } doAnswer {}
}
val pubKey = OpenGroupApi.defaultServerPublicKey
val room = "oxen"
val legacyServer = OpenGroupApi.legacyDefaultServer
val newServer = OpenGroupApi.defaultServer
val lokiThreadOpenGroup = argumentCaptor<OpenGroup>()
val mockedLokiThreadDb = mock<LokiThreadDatabase> {
on { getOpenGroupChat(eq(LEGACY_THREAD_ID)) } doReturn OpenGroup(legacyServer, room, "Oxen", 0, pubKey)
on { setOpenGroupChat(lokiThreadOpenGroup.capture(), eq(LEGACY_THREAD_ID)) } doAnswer {}
}
val mockedDbComponent = mock<DatabaseComponent> {
on { threadDatabase() } doReturn mockedThreadDb
on { groupDatabase() } doReturn mockedGroupDb
on { lokiAPIDatabase() } doReturn mockedLokiApi
on { lokiThreadDatabase() } doReturn mockedLokiThreadDb
}
OpenGroupMigrator.migrate(mockedDbComponent)
// expect threadDB migration to reflect new thread values:
// thread ID = 1, encoded ID = new encoded ID
assertEquals(LEGACY_THREAD_ID, capturedThreadId.firstValue)
assertEquals(EXAMPLE_NEW_ENCODED_OPEN_GROUP, capturedNewEncoded.firstValue)
// expect groupDB migration to reflect new thread values:
// legacy encoded ID, new encoded ID
assertEquals(EXAMPLE_LEGACY_ENCODED_OPEN_GROUP, capturedGroupLegacyEncoded.firstValue)
assertEquals(EXAMPLE_NEW_ENCODED_OPEN_GROUP, capturedGroupNewEncoded.firstValue)
// expect Loki API DB migration to reflect new thread values:
assertEquals("${OpenGroupApi.legacyDefaultServer}.oxen", capturedLokiLegacyGroup.firstValue)
assertEquals("${OpenGroupApi.defaultServer}.oxen", capturedLokiNewGroup.firstValue)
assertEquals(newServer, lokiThreadOpenGroup.firstValue.server)
}
@Test
fun `it should migrate and delete legacy thread with conflicting new and old values`() {
// mock threadDB
val capturedThreadId = argumentCaptor<Long>()
val mockedThreadDb = mock<ThreadDatabase> {
val legacyThreadRecord = legacyThreadRecord()
val newThreadRecord = newThreadRecord()
on { legacyOxenOpenGroups } doReturn listOf(legacyThreadRecord)
on { httpsOxenOpenGroups } doReturn listOf(newThreadRecord)
on { deleteConversation(capturedThreadId.capture()) } doAnswer {}
}
// mock groupDB
val capturedGroupLegacyEncoded = argumentCaptor<String>()
val mockedGroupDb = mock<GroupDatabase> {
on { delete(capturedGroupLegacyEncoded.capture()) } doReturn true
}
// mock LokiAPIDB
val capturedLokiLegacyGroup = argumentCaptor<String>()
val capturedLokiNewGroup = argumentCaptor<String>()
val mockedLokiApi = mock<LokiAPIDatabase> {
on { migrateLegacyOpenGroup(capturedLokiLegacyGroup.capture(), capturedLokiNewGroup.capture()) } doAnswer {}
}
// mock messaging dbs
val migrateMmsFromThreadId = argumentCaptor<Long>()
val migrateMmsToThreadId = argumentCaptor<Long>()
val mockedMmsDb = mock<MmsDatabase> {
on { migrateThreadId(migrateMmsFromThreadId.capture(), migrateMmsToThreadId.capture()) } doAnswer {}
}
val migrateSmsFromThreadId = argumentCaptor<Long>()
val migrateSmsToThreadId = argumentCaptor<Long>()
val mockedSmsDb = mock<SmsDatabase> {
on { migrateThreadId(migrateSmsFromThreadId.capture(), migrateSmsToThreadId.capture()) } doAnswer {}
}
val lokiFromThreadId = argumentCaptor<Long>()
val lokiToThreadId = argumentCaptor<Long>()
val mockedLokiMessageDatabase = mock<LokiMessageDatabase> {
on { migrateThreadId(lokiFromThreadId.capture(), lokiToThreadId.capture()) } doAnswer {}
}
val mockedLokiThreadDb = mock<LokiThreadDatabase> {
on { removeOpenGroupChat(eq(LEGACY_THREAD_ID)) } doAnswer {}
}
val mockedDbComponent = mock<DatabaseComponent> {
on { threadDatabase() } doReturn mockedThreadDb
on { groupDatabase() } doReturn mockedGroupDb
on { lokiAPIDatabase() } doReturn mockedLokiApi
on { mmsDatabase() } doReturn mockedMmsDb
on { smsDatabase() } doReturn mockedSmsDb
on { lokiMessageDatabase() } doReturn mockedLokiMessageDatabase
on { lokiThreadDatabase() } doReturn mockedLokiThreadDb
}
OpenGroupMigrator.migrate(mockedDbComponent)
// should delete thread by thread ID
assertEquals(LEGACY_THREAD_ID, capturedThreadId.firstValue)
// should delete group by legacy encoded ID
assertEquals(EXAMPLE_LEGACY_ENCODED_OPEN_GROUP, capturedGroupLegacyEncoded.firstValue)
// should migrate SMS from legacy thread ID to new thread ID
assertEquals(LEGACY_THREAD_ID, migrateSmsFromThreadId.firstValue)
assertEquals(NEW_THREAD_ID, migrateSmsToThreadId.firstValue)
// should migrate MMS from legacy thread ID to new thread ID
assertEquals(LEGACY_THREAD_ID, migrateMmsFromThreadId.firstValue)
assertEquals(NEW_THREAD_ID, migrateMmsToThreadId.firstValue)
}
}

View File

@ -42,7 +42,7 @@ class AttachmentDownloadJob(val attachmentID: Long, val databaseMessageID: Long)
private val TS_INCOMING_MESSAGE_ID_KEY = "tsIncoming_message_id" private val TS_INCOMING_MESSAGE_ID_KEY = "tsIncoming_message_id"
} }
override fun execute() { override fun execute(dispatcherName: String) {
val storage = MessagingModuleConfiguration.shared.storage val storage = MessagingModuleConfiguration.shared.storage
val messageDataProvider = MessagingModuleConfiguration.shared.messageDataProvider val messageDataProvider = MessagingModuleConfiguration.shared.messageDataProvider
val threadID = storage.getThreadIdForMms(databaseMessageID) val threadID = storage.getThreadIdForMms(databaseMessageID)
@ -59,7 +59,7 @@ class AttachmentDownloadJob(val attachmentID: Long, val databaseMessageID: Long)
Log.d("AttachmentDownloadJob", "Setting attachment state = failed, don't have attachment") Log.d("AttachmentDownloadJob", "Setting attachment state = failed, don't have attachment")
messageDataProvider.setAttachmentState(AttachmentState.FAILED, AttachmentId(attachmentID,0), databaseMessageID) messageDataProvider.setAttachmentState(AttachmentState.FAILED, AttachmentId(attachmentID,0), databaseMessageID)
} }
this.handlePermanentFailure(exception) this.handlePermanentFailure(dispatcherName, exception)
} else if (exception == Error.DuplicateData) { } else if (exception == Error.DuplicateData) {
attachment?.let { id -> attachment?.let { id ->
Log.d("AttachmentDownloadJob", "Setting attachment state = done from duplicate data") Log.d("AttachmentDownloadJob", "Setting attachment state = done from duplicate data")
@ -68,7 +68,7 @@ class AttachmentDownloadJob(val attachmentID: Long, val databaseMessageID: Long)
Log.d("AttachmentDownloadJob", "Setting attachment state = done from duplicate data") Log.d("AttachmentDownloadJob", "Setting attachment state = done from duplicate data")
messageDataProvider.setAttachmentState(AttachmentState.DONE, AttachmentId(attachmentID,0), databaseMessageID) messageDataProvider.setAttachmentState(AttachmentState.DONE, AttachmentId(attachmentID,0), databaseMessageID)
} }
this.handleSuccess() this.handleSuccess(dispatcherName)
} else { } else {
if (failureCount + 1 >= maxFailureCount) { if (failureCount + 1 >= maxFailureCount) {
attachment?.let { id -> attachment?.let { id ->
@ -79,7 +79,7 @@ class AttachmentDownloadJob(val attachmentID: Long, val databaseMessageID: Long)
messageDataProvider.setAttachmentState(AttachmentState.FAILED, AttachmentId(attachmentID,0), databaseMessageID) messageDataProvider.setAttachmentState(AttachmentState.FAILED, AttachmentId(attachmentID,0), databaseMessageID)
} }
} }
this.handleFailure(exception) this.handleFailure(dispatcherName, exception)
} }
} }
@ -150,7 +150,7 @@ class AttachmentDownloadJob(val attachmentID: Long, val databaseMessageID: Long)
Log.d("AttachmentDownloadJob", "deleting tempfile") Log.d("AttachmentDownloadJob", "deleting tempfile")
tempFile.delete() tempFile.delete()
Log.d("AttachmentDownloadJob", "succeeding job") Log.d("AttachmentDownloadJob", "succeeding job")
handleSuccess() handleSuccess(dispatcherName)
} catch (e: Exception) { } catch (e: Exception) {
Log.e("AttachmentDownloadJob", "Error processing attachment download", e) Log.e("AttachmentDownloadJob", "Error processing attachment download", e)
tempFile?.delete() tempFile?.delete()
@ -169,17 +169,17 @@ class AttachmentDownloadJob(val attachmentID: Long, val databaseMessageID: Long)
} }
} }
private fun handleSuccess() { private fun handleSuccess(dispatcherName: String) {
Log.w("AttachmentDownloadJob", "Attachment downloaded successfully.") Log.w("AttachmentDownloadJob", "Attachment downloaded successfully.")
delegate?.handleJobSucceeded(this) delegate?.handleJobSucceeded(this, dispatcherName)
} }
private fun handlePermanentFailure(e: Exception) { private fun handlePermanentFailure(dispatcherName: String, e: Exception) {
delegate?.handleJobFailedPermanently(this, e) delegate?.handleJobFailedPermanently(this, dispatcherName, e)
} }
private fun handleFailure(e: Exception) { private fun handleFailure(dispatcherName: String, e: Exception) {
delegate?.handleJobFailed(this, e) delegate?.handleJobFailed(this, dispatcherName, e)
} }
private fun createTempFile(): File { private fun createTempFile(): File {

View File

@ -45,29 +45,29 @@ class AttachmentUploadJob(val attachmentID: Long, val threadID: String, val mess
private val MESSAGE_SEND_JOB_ID_KEY = "message_send_job_id" private val MESSAGE_SEND_JOB_ID_KEY = "message_send_job_id"
} }
override fun execute() { override fun execute(dispatcherName: String) {
try { try {
val storage = MessagingModuleConfiguration.shared.storage val storage = MessagingModuleConfiguration.shared.storage
val messageDataProvider = MessagingModuleConfiguration.shared.messageDataProvider val messageDataProvider = MessagingModuleConfiguration.shared.messageDataProvider
val attachment = messageDataProvider.getScaledSignalAttachmentStream(attachmentID) val attachment = messageDataProvider.getScaledSignalAttachmentStream(attachmentID)
?: return handleFailure(Error.NoAttachment) ?: return handleFailure(dispatcherName, Error.NoAttachment)
val openGroup = storage.getOpenGroup(threadID.toLong()) val openGroup = storage.getOpenGroup(threadID.toLong())
if (openGroup != null) { if (openGroup != null) {
val keyAndResult = upload(attachment, openGroup.server, false) { val keyAndResult = upload(attachment, openGroup.server, false) {
OpenGroupApi.upload(it, openGroup.room, openGroup.server) OpenGroupApi.upload(it, openGroup.room, openGroup.server)
} }
handleSuccess(attachment, keyAndResult.first, keyAndResult.second) handleSuccess(dispatcherName, attachment, keyAndResult.first, keyAndResult.second)
} else { } else {
val keyAndResult = upload(attachment, FileServerApi.server, true) { val keyAndResult = upload(attachment, FileServerApi.server, true) {
FileServerApi.upload(it) FileServerApi.upload(it)
} }
handleSuccess(attachment, keyAndResult.first, keyAndResult.second) handleSuccess(dispatcherName, attachment, keyAndResult.first, keyAndResult.second)
} }
} catch (e: java.lang.Exception) { } catch (e: java.lang.Exception) {
if (e == Error.NoAttachment) { if (e == Error.NoAttachment) {
this.handlePermanentFailure(e) this.handlePermanentFailure(dispatcherName, e)
} else { } else {
this.handleFailure(e) this.handleFailure(dispatcherName, e)
} }
} }
} }
@ -104,9 +104,9 @@ class AttachmentUploadJob(val attachmentID: Long, val threadID: String, val mess
return Pair(key, UploadResult(id, "${server}/file/$id", digest)) return Pair(key, UploadResult(id, "${server}/file/$id", digest))
} }
private fun handleSuccess(attachment: SignalServiceAttachmentStream, attachmentKey: ByteArray, uploadResult: UploadResult) { private fun handleSuccess(dispatcherName: String, attachment: SignalServiceAttachmentStream, attachmentKey: ByteArray, uploadResult: UploadResult) {
Log.d(TAG, "Attachment uploaded successfully.") Log.d(TAG, "Attachment uploaded successfully.")
delegate?.handleJobSucceeded(this) delegate?.handleJobSucceeded(this, dispatcherName)
val messageDataProvider = MessagingModuleConfiguration.shared.messageDataProvider val messageDataProvider = MessagingModuleConfiguration.shared.messageDataProvider
messageDataProvider.handleSuccessfulAttachmentUpload(attachmentID, attachment, attachmentKey, uploadResult) messageDataProvider.handleSuccessfulAttachmentUpload(attachmentID, attachment, attachmentKey, uploadResult)
if (attachment.contentType.startsWith("audio/")) { if (attachment.contentType.startsWith("audio/")) {
@ -144,16 +144,16 @@ class AttachmentUploadJob(val attachmentID: Long, val threadID: String, val mess
storage.resumeMessageSendJobIfNeeded(messageSendJobID) storage.resumeMessageSendJobIfNeeded(messageSendJobID)
} }
private fun handlePermanentFailure(e: Exception) { private fun handlePermanentFailure(dispatcherName: String, e: Exception) {
Log.w(TAG, "Attachment upload failed permanently due to error: $this.") Log.w(TAG, "Attachment upload failed permanently due to error: $this.")
delegate?.handleJobFailedPermanently(this, e) delegate?.handleJobFailedPermanently(this, dispatcherName, e)
MessagingModuleConfiguration.shared.messageDataProvider.handleFailedAttachmentUpload(attachmentID) MessagingModuleConfiguration.shared.messageDataProvider.handleFailedAttachmentUpload(attachmentID)
failAssociatedMessageSendJob(e) failAssociatedMessageSendJob(e)
} }
private fun handleFailure(e: Exception) { private fun handleFailure(dispatcherName: String, e: Exception) {
Log.w(TAG, "Attachment upload failed due to error: $this.") Log.w(TAG, "Attachment upload failed due to error: $this.")
delegate?.handleJobFailed(this, e) delegate?.handleJobFailed(this, dispatcherName, e)
if (failureCount + 1 >= maxFailureCount) { if (failureCount + 1 >= maxFailureCount) {
failAssociatedMessageSendJob(e) failAssociatedMessageSendJob(e)
} }

View File

@ -29,14 +29,14 @@ class BackgroundGroupAddJob(val joinUrl: String): Job {
return "$server.$room" return "$server.$room"
} }
override fun execute() { override fun execute(dispatcherName: String) {
try { try {
val openGroup = OpenGroupUrlParser.parseUrl(joinUrl) val openGroup = OpenGroupUrlParser.parseUrl(joinUrl)
val storage = MessagingModuleConfiguration.shared.storage val storage = MessagingModuleConfiguration.shared.storage
val allOpenGroups = storage.getAllOpenGroups().map { it.value.joinURL } val allOpenGroups = storage.getAllOpenGroups().map { it.value.joinURL }
if (allOpenGroups.contains(openGroup.joinUrl())) { if (allOpenGroups.contains(openGroup.joinUrl())) {
Log.e("OpenGroupDispatcher", "Failed to add group because", DuplicateGroupException()) Log.e("OpenGroupDispatcher", "Failed to add group because", DuplicateGroupException())
delegate?.handleJobFailed(this, DuplicateGroupException()) delegate?.handleJobFailed(this, dispatcherName, DuplicateGroupException())
return return
} }
// get image // get image
@ -50,11 +50,11 @@ class BackgroundGroupAddJob(val joinUrl: String): Job {
storage.onOpenGroupAdded(openGroup.server) storage.onOpenGroupAdded(openGroup.server)
} catch (e: Exception) { } catch (e: Exception) {
Log.e("OpenGroupDispatcher", "Failed to add group because",e) Log.e("OpenGroupDispatcher", "Failed to add group because",e)
delegate?.handleJobFailed(this, e) delegate?.handleJobFailed(this, dispatcherName, e)
return return
} }
Log.d("Loki", "Group added successfully") Log.d("Loki", "Group added successfully")
delegate?.handleJobSucceeded(this) delegate?.handleJobSucceeded(this, dispatcherName)
} }
override fun serialize(): Data = Data.Builder() override fun serialize(): Data = Data.Builder()

View File

@ -66,11 +66,11 @@ class BatchMessageReceiveJob(
return storage.getOrCreateThreadIdFor(senderOrSync, message.groupPublicKey, openGroupID) return storage.getOrCreateThreadIdFor(senderOrSync, message.groupPublicKey, openGroupID)
} }
override fun execute() { override fun execute(dispatcherName: String) {
executeAsync().get() executeAsync(dispatcherName).get()
} }
fun executeAsync(): Promise<Unit, Exception> { fun executeAsync(dispatcherName: String): Promise<Unit, Exception> {
return task { return task {
val threadMap = mutableMapOf<Long, MutableList<ParsedMessage>>() val threadMap = mutableMapOf<Long, MutableList<ParsedMessage>>()
val storage = MessagingModuleConfiguration.shared.storage val storage = MessagingModuleConfiguration.shared.storage
@ -188,19 +188,21 @@ class BatchMessageReceiveJob(
deferredThreadMap.awaitAll() deferredThreadMap.awaitAll()
} }
if (failures.isEmpty()) { if (failures.isEmpty()) {
handleSuccess() handleSuccess(dispatcherName)
} else { } else {
handleFailure() handleFailure(dispatcherName)
} }
} }
} }
private fun handleSuccess() { private fun handleSuccess(dispatcherName: String) {
this.delegate?.handleJobSucceeded(this) Log.i(TAG, "Completed processing of ${messages.size} messages")
this.delegate?.handleJobSucceeded(this, dispatcherName)
} }
private fun handleFailure() { private fun handleFailure(dispatcherName: String) {
this.delegate?.handleJobFailed(this, Exception("One or more jobs resulted in failure")) Log.i(TAG, "Handling failure of ${failures.size} messages (${messages.size - failures.size} processed successfully)")
this.delegate?.handleJobFailed(this, dispatcherName, Exception("One or more jobs resulted in failure"))
} }
override fun serialize(): Data { override fun serialize(): Data {

View File

@ -12,7 +12,7 @@ class GroupAvatarDownloadJob(val room: String, val server: String) : Job {
override var failureCount: Int = 0 override var failureCount: Int = 0
override val maxFailureCount: Int = 10 override val maxFailureCount: Int = 10
override fun execute() { override fun execute(dispatcherName: String) {
val storage = MessagingModuleConfiguration.shared.storage val storage = MessagingModuleConfiguration.shared.storage
val imageId = storage.getOpenGroup(room, server)?.imageId ?: return val imageId = storage.getOpenGroup(room, server)?.imageId ?: return
try { try {
@ -20,9 +20,9 @@ class GroupAvatarDownloadJob(val room: String, val server: String) : Job {
val groupId = GroupUtil.getEncodedOpenGroupID("$server.$room".toByteArray()) val groupId = GroupUtil.getEncodedOpenGroupID("$server.$room".toByteArray())
storage.updateProfilePicture(groupId, bytes) storage.updateProfilePicture(groupId, bytes)
storage.updateTimestampUpdated(groupId, System.currentTimeMillis()) storage.updateTimestampUpdated(groupId, System.currentTimeMillis())
delegate?.handleJobSucceeded(this) delegate?.handleJobSucceeded(this, dispatcherName)
} catch (e: Exception) { } catch (e: Exception) {
delegate?.handleJobFailed(this, e) delegate?.handleJobFailed(this, dispatcherName, e)
} }
} }

View File

@ -17,7 +17,7 @@ interface Job {
internal const val MAX_BUFFER_SIZE = 1_000_000 // bytes internal const val MAX_BUFFER_SIZE = 1_000_000 // bytes
} }
fun execute() fun execute(dispatcherName: String)
fun serialize(): Data fun serialize(): Data

View File

@ -2,7 +2,7 @@ package org.session.libsession.messaging.jobs
interface JobDelegate { interface JobDelegate {
fun handleJobSucceeded(job: Job) fun handleJobSucceeded(job: Job, dispatcherName: String)
fun handleJobFailed(job: Job, error: Exception) fun handleJobFailed(job: Job, dispatcherName: String, error: Exception)
fun handleJobFailedPermanently(job: Job, error: Exception) fun handleJobFailedPermanently(job: Job, dispatcherName: String, error: Exception)
} }

View File

@ -53,7 +53,7 @@ class JobQueue : JobDelegate {
} }
if (openGroupId.isNullOrEmpty()) { if (openGroupId.isNullOrEmpty()) {
Log.e("OpenGroupDispatcher", "Open Group ID was null on ${job.javaClass.simpleName}") Log.e("OpenGroupDispatcher", "Open Group ID was null on ${job.javaClass.simpleName}")
handleJobFailedPermanently(job, NullPointerException("Open Group ID was null")) handleJobFailedPermanently(job, name, NullPointerException("Open Group ID was null"))
} else { } else {
val groupChannel = if (!openGroupChannels.containsKey(openGroupId)) { val groupChannel = if (!openGroupChannels.containsKey(openGroupId)) {
Log.d("OpenGroupDispatcher", "Creating ${openGroupId.hashCode()} channel") Log.d("OpenGroupDispatcher", "Creating ${openGroupId.hashCode()} channel")
@ -95,9 +95,16 @@ class JobQueue : JobDelegate {
} }
private fun Job.process(dispatcherName: String) { private fun Job.process(dispatcherName: String) {
Log.d(dispatcherName,"processJob: ${javaClass.simpleName}") Log.d(dispatcherName,"processJob: ${javaClass.simpleName} (id: $id)")
delegate = this@JobQueue delegate = this@JobQueue
execute()
try {
execute(dispatcherName)
}
catch (e: Exception) {
Log.d(dispatcherName, "unhandledJobException: ${javaClass.simpleName} (id: $id)")
this@JobQueue.handleJobFailed(this, dispatcherName, e)
}
} }
init { init {
@ -177,7 +184,7 @@ class JobQueue : JobDelegate {
return return
} }
if (!pendingJobIds.add(id)) { if (!pendingJobIds.add(id)) {
Log.e("Loki","tried to re-queue pending/in-progress job") Log.e("Loki","tried to re-queue pending/in-progress job (id: $id)")
return return
} }
queue.trySend(job) queue.trySend(job)
@ -196,7 +203,7 @@ class JobQueue : JobDelegate {
} }
} }
pendingJobs.sortedBy { it.id }.forEach { job -> pendingJobs.sortedBy { it.id }.forEach { job ->
Log.i("Loki", "Resuming pending job of type: ${job::class.simpleName}.") Log.i("Loki", "Resuming pending job of type: ${job::class.simpleName} (id: ${job.id}).")
queue.trySend(job) // Offer always called on unlimited capacity queue.trySend(job) // Offer always called on unlimited capacity
} }
} }
@ -223,21 +230,21 @@ class JobQueue : JobDelegate {
} }
} }
override fun handleJobSucceeded(job: Job) { override fun handleJobSucceeded(job: Job, dispatcherName: String) {
val jobId = job.id ?: return val jobId = job.id ?: return
MessagingModuleConfiguration.shared.storage.markJobAsSucceeded(jobId) MessagingModuleConfiguration.shared.storage.markJobAsSucceeded(jobId)
pendingJobIds.remove(jobId) pendingJobIds.remove(jobId)
} }
override fun handleJobFailed(job: Job, error: Exception) { override fun handleJobFailed(job: Job, dispatcherName: String, error: Exception) {
// Canceled // Canceled
val storage = MessagingModuleConfiguration.shared.storage val storage = MessagingModuleConfiguration.shared.storage
if (storage.isJobCanceled(job)) { if (storage.isJobCanceled(job)) {
return Log.i("Loki", "${job::class.simpleName} canceled.") return Log.i("Loki", "${job::class.simpleName} canceled (id: ${job.id}).")
} }
// Message send jobs waiting for the attachment to upload // Message send jobs waiting for the attachment to upload
if (job is MessageSendJob && error is MessageSendJob.AwaitingAttachmentUploadException) { if (job is MessageSendJob && error is MessageSendJob.AwaitingAttachmentUploadException) {
Log.i("Loki", "Message send job waiting for attachment upload to finish.") Log.i("Loki", "Message send job waiting for attachment upload to finish (id: ${job.id}).")
return return
} }
@ -255,21 +262,22 @@ class JobQueue : JobDelegate {
job.failureCount += 1 job.failureCount += 1
if (job.failureCount >= job.maxFailureCount) { if (job.failureCount >= job.maxFailureCount) {
handleJobFailedPermanently(job, error) handleJobFailedPermanently(job, dispatcherName, error)
} else { } else {
storage.persistJob(job) storage.persistJob(job)
val retryInterval = getRetryInterval(job) val retryInterval = getRetryInterval(job)
Log.i("Loki", "${job::class.simpleName} failed; scheduling retry (failure count is ${job.failureCount}).") Log.i("Loki", "${job::class.simpleName} failed (id: ${job.id}); scheduling retry (failure count is ${job.failureCount}).")
timer.schedule(delay = retryInterval) { timer.schedule(delay = retryInterval) {
Log.i("Loki", "Retrying ${job::class.simpleName}.") Log.i("Loki", "Retrying ${job::class.simpleName} (id: ${job.id}).")
queue.trySend(job) queue.trySend(job)
} }
} }
} }
override fun handleJobFailedPermanently(job: Job, error: Exception) { override fun handleJobFailedPermanently(job: Job, dispatcherName: String, error: Exception) {
val jobId = job.id ?: return val jobId = job.id ?: return
handleJobFailedPermanently(jobId) handleJobFailedPermanently(jobId)
Log.d(dispatcherName, "permanentlyFailedJob: ${javaClass.simpleName} (id: ${job.id})")
} }
private fun handleJobFailedPermanently(jobId: String) { private fun handleJobFailedPermanently(jobId: String) {

View File

@ -25,11 +25,11 @@ class MessageReceiveJob(val data: ByteArray, val serverHash: String? = null, val
private val OPEN_GROUP_ID_KEY = "open_group_id" private val OPEN_GROUP_ID_KEY = "open_group_id"
} }
override fun execute() { override fun execute(dispatcherName: String) {
executeAsync().get() executeAsync(dispatcherName).get()
} }
fun executeAsync(): Promise<Unit, Exception> { fun executeAsync(dispatcherName: String): Promise<Unit, Exception> {
val deferred = deferred<Unit, Exception>() val deferred = deferred<Unit, Exception>()
try { try {
val isRetry: Boolean = failureCount != 0 val isRetry: Boolean = failureCount != 0
@ -39,32 +39,32 @@ class MessageReceiveJob(val data: ByteArray, val serverHash: String? = null, val
val (message, proto) = MessageReceiver.parse(this.data, this.openGroupMessageServerID, openGroupPublicKey = serverPublicKey) val (message, proto) = MessageReceiver.parse(this.data, this.openGroupMessageServerID, openGroupPublicKey = serverPublicKey)
message.serverHash = serverHash message.serverHash = serverHash
MessageReceiver.handle(message, proto, this.openGroupID) MessageReceiver.handle(message, proto, this.openGroupID)
this.handleSuccess() this.handleSuccess(dispatcherName)
deferred.resolve(Unit) deferred.resolve(Unit)
} catch (e: Exception) { } catch (e: Exception) {
Log.e(TAG, "Couldn't receive message.", e) Log.e(TAG, "Couldn't receive message.", e)
if (e is MessageReceiver.Error && !e.isRetryable) { if (e is MessageReceiver.Error && !e.isRetryable) {
Log.e("Loki", "Message receive job permanently failed.", e) Log.e("Loki", "Message receive job permanently failed.", e)
this.handlePermanentFailure(e) this.handlePermanentFailure(dispatcherName, e)
} else { } else {
Log.e("Loki", "Couldn't receive message.", e) Log.e("Loki", "Couldn't receive message.", e)
this.handleFailure(e) this.handleFailure(dispatcherName, e)
} }
deferred.resolve(Unit) // The promise is just used to keep track of when we're done deferred.resolve(Unit) // The promise is just used to keep track of when we're done
} }
return deferred.promise return deferred.promise
} }
private fun handleSuccess() { private fun handleSuccess(dispatcherName: String) {
delegate?.handleJobSucceeded(this) delegate?.handleJobSucceeded(this, dispatcherName)
} }
private fun handlePermanentFailure(e: Exception) { private fun handlePermanentFailure(dispatcherName: String, e: Exception) {
delegate?.handleJobFailedPermanently(this, e) delegate?.handleJobFailedPermanently(this, dispatcherName, e)
} }
private fun handleFailure(e: Exception) { private fun handleFailure(dispatcherName: String, e: Exception) {
delegate?.handleJobFailed(this, e) delegate?.handleJobFailed(this, dispatcherName, e)
} }
override fun serialize(): Data { override fun serialize(): Data {

View File

@ -33,7 +33,7 @@ class MessageSendJob(val message: Message, val destination: Destination) : Job {
private val DESTINATION_KEY = "destination" private val DESTINATION_KEY = "destination"
} }
override fun execute() { override fun execute(dispatcherName: String) {
val messageDataProvider = MessagingModuleConfiguration.shared.messageDataProvider val messageDataProvider = MessagingModuleConfiguration.shared.messageDataProvider
val message = message as? VisibleMessage val message = message as? VisibleMessage
val storage = MessagingModuleConfiguration.shared.storage val storage = MessagingModuleConfiguration.shared.storage
@ -61,12 +61,12 @@ class MessageSendJob(val message: Message, val destination: Destination) : Job {
} }
} }
if (attachmentsToUpload.isNotEmpty()) { if (attachmentsToUpload.isNotEmpty()) {
this.handleFailure(AwaitingAttachmentUploadException) this.handleFailure(dispatcherName, AwaitingAttachmentUploadException)
return return
} // Wait for all attachments to upload before continuing } // Wait for all attachments to upload before continuing
} }
val promise = MessageSender.send(this.message, this.destination).success { val promise = MessageSender.send(this.message, this.destination).success {
this.handleSuccess() this.handleSuccess(dispatcherName)
}.fail { exception -> }.fail { exception ->
var logStacktrace = true var logStacktrace = true
@ -75,14 +75,14 @@ class MessageSendJob(val message: Message, val destination: Destination) : Job {
is HTTP.HTTPRequestFailedException -> { is HTTP.HTTPRequestFailedException -> {
logStacktrace = false logStacktrace = false
if (exception.statusCode == 429) { this.handlePermanentFailure(exception) } if (exception.statusCode == 429) { this.handlePermanentFailure(dispatcherName, exception) }
else { this.handleFailure(exception) } else { this.handleFailure(dispatcherName, exception) }
} }
is MessageSender.Error -> { is MessageSender.Error -> {
if (!exception.isRetryable) { this.handlePermanentFailure(exception) } if (!exception.isRetryable) { this.handlePermanentFailure(dispatcherName, exception) }
else { this.handleFailure(exception) } else { this.handleFailure(dispatcherName, exception) }
} }
else -> this.handleFailure(exception) else -> this.handleFailure(dispatcherName, exception)
} }
if (logStacktrace) { Log.e(TAG, "Couldn't send message due to error", exception) } if (logStacktrace) { Log.e(TAG, "Couldn't send message due to error", exception) }
@ -95,15 +95,15 @@ class MessageSendJob(val message: Message, val destination: Destination) : Job {
} }
} }
private fun handleSuccess() { private fun handleSuccess(dispatcherName: String) {
delegate?.handleJobSucceeded(this) delegate?.handleJobSucceeded(this, dispatcherName)
} }
private fun handlePermanentFailure(error: Exception) { private fun handlePermanentFailure(dispatcherName: String, error: Exception) {
delegate?.handleJobFailedPermanently(this, error) delegate?.handleJobFailedPermanently(this, dispatcherName, error)
} }
private fun handleFailure(error: Exception) { private fun handleFailure(dispatcherName: String, error: Exception) {
Log.w(TAG, "Failed to send $message::class.simpleName.") Log.w(TAG, "Failed to send $message::class.simpleName.")
val message = message as? VisibleMessage val message = message as? VisibleMessage
if (message != null) { if (message != null) {
@ -111,7 +111,7 @@ class MessageSendJob(val message: Message, val destination: Destination) : Job {
return // The message has been deleted return // The message has been deleted
} }
} }
delegate?.handleJobFailed(this, error) delegate?.handleJobFailed(this, dispatcherName, error)
} }
override fun serialize(): Data { override fun serialize(): Data {

View File

@ -32,7 +32,7 @@ class NotifyPNServerJob(val message: SnodeMessage) : Job {
private val MESSAGE_KEY = "message" private val MESSAGE_KEY = "message"
} }
override fun execute() { override fun execute(dispatcherName: String) {
val server = PushNotificationAPI.server val server = PushNotificationAPI.server
val parameters = mapOf( "data" to message.data, "send_to" to message.recipient ) val parameters = mapOf( "data" to message.data, "send_to" to message.recipient )
val url = "${server}/notify" val url = "${server}/notify"
@ -48,18 +48,18 @@ class NotifyPNServerJob(val message: SnodeMessage) : Job {
Log.d("Loki", "Couldn't notify PN server due to error: $exception.") Log.d("Loki", "Couldn't notify PN server due to error: $exception.")
} }
}.success { }.success {
handleSuccess() handleSuccess(dispatcherName)
}. fail { }. fail {
handleFailure(it) handleFailure(dispatcherName, it)
} }
} }
private fun handleSuccess() { private fun handleSuccess(dispatcherName: String) {
delegate?.handleJobSucceeded(this) delegate?.handleJobSucceeded(this, dispatcherName)
} }
private fun handleFailure(error: Exception) { private fun handleFailure(dispatcherName: String, error: Exception) {
delegate?.handleJobFailed(this, error) delegate?.handleJobFailed(this, dispatcherName, error)
} }
override fun serialize(): Data { override fun serialize(): Data {

View File

@ -19,7 +19,7 @@ class OpenGroupDeleteJob(private val messageServerIds: LongArray, private val th
override var failureCount: Int = 0 override var failureCount: Int = 0
override val maxFailureCount: Int = 1 override val maxFailureCount: Int = 1
override fun execute() { override fun execute(dispatcherName: String) {
val dataProvider = MessagingModuleConfiguration.shared.messageDataProvider val dataProvider = MessagingModuleConfiguration.shared.messageDataProvider
val numberToDelete = messageServerIds.size val numberToDelete = messageServerIds.size
Log.d(TAG, "Deleting $numberToDelete messages") Log.d(TAG, "Deleting $numberToDelete messages")
@ -39,10 +39,10 @@ class OpenGroupDeleteJob(private val messageServerIds: LongArray, private val th
} }
Log.d(TAG, "Deleted ${messageIds.first.size + messageIds.second.size} messages successfully") Log.d(TAG, "Deleted ${messageIds.first.size + messageIds.second.size} messages successfully")
delegate?.handleJobSucceeded(this) delegate?.handleJobSucceeded(this, dispatcherName)
} }
catch (e: Exception) { catch (e: Exception) {
delegate?.handleJobFailed(this, e) delegate?.handleJobFailed(this, dispatcherName, e)
} }
} }

View File

@ -20,7 +20,7 @@ class TrimThreadJob(val threadId: Long, val openGroupId: String?) : Job {
const val THREAD_LENGTH_TRIGGER_SIZE = 2000 const val THREAD_LENGTH_TRIGGER_SIZE = 2000
} }
override fun execute() { override fun execute(dispatcherName: String) {
val context = MessagingModuleConfiguration.shared.context val context = MessagingModuleConfiguration.shared.context
val trimmingEnabled = TextSecurePreferences.isThreadLengthTrimmingEnabled(context) val trimmingEnabled = TextSecurePreferences.isThreadLengthTrimmingEnabled(context)
val storage = MessagingModuleConfiguration.shared.storage val storage = MessagingModuleConfiguration.shared.storage
@ -29,7 +29,7 @@ class TrimThreadJob(val threadId: Long, val openGroupId: String?) : Job {
val oldestMessageTime = System.currentTimeMillis() - TRIM_TIME_LIMIT val oldestMessageTime = System.currentTimeMillis() - TRIM_TIME_LIMIT
storage.trimThreadBefore(threadId, oldestMessageTime) storage.trimThreadBefore(threadId, oldestMessageTime)
} }
delegate?.handleJobSucceeded(this) delegate?.handleJobSucceeded(this, dispatcherName)
} }
override fun serialize(): Data { override fun serialize(): Data {